Quickstart

Learn to develop and control K-Scale robots with our open-source software.

Introduction

pykos is a Python client for the K-Scale Operating System (KOS), allowing you to interact with various services provided by KOS, such as controlling actuators, accessing IMU data, managing processes, and more.

Installation

To install pykos, you can use pip:

pip install pykos

Getting Started

First, import the pykos module and create an instance of the KOS client. Remember to connect to the robot through the cable or ip.

from pykos import KOS

# Connect to KOS running on localhost at port 50051
client = KOS(ip='localhost', port=50051)

Replace 'localhost' and 50051 with the IP address and port where your KOS server is running.

Services

The KOS client provides several services, each accessible via the client object. Below are the services and examples of how to use them.

Actuator Service

Control actuators connected to the robot.

Actuator Definition

Use this convention to assign names to actuator IDs:

ACTUATOR_NAME_TO_ID = {
    "left_shoulder_yaw": 11,
    "left_shoulder_pitch": 12,
    "left_elbow_yaw": 13,
    "left_gripper": 14,
    "right_shoulder_yaw": 21,
    "right_shoulder_pitch": 22,
    "right_elbow_yaw": 23,
    "right_gripper": 24,
    "left_hip_yaw": 31,
    "left_hip_roll": 32,
    "left_hip_pitch": 33,
    "left_knee_pitch": 34,
    "left_ankle_pitch": 35,
    "right_hip_yaw": 41,
    "right_hip_roll": 42,
    "right_hip_pitch": 43,
    "right_knee_pitch": 44,
    "right_ankle_pitch": 45,
}

ACTUATOR_ID_TO_NAME = {v: k for k, v in ACTUATOR_NAME_TO_ID.items()}

Example: Configure an Actuator

# Configure actuator with ID 1
response = client.actuator.configure_actuator(
    actuator_id=1,
    kp=32,
    kd=32,
    ki=32,
    max_torque=100.0,
    torque_enabled=True,
    zero_position=True
)
if response.success:
    print("Actuator configured successfully.")
else:
    print(f"Failed to configure actuator: {response.error}")

Example: Command

# Command multiple actuators
commands = [
    {"actuator_id": 1, "position": 90.0},
    {"actuator_id": 2, "position": 180.0}
]
response = client.actuator.command_actuators(commands)
for result in response.results:
    if result.success:
        print(f"Actuator {result.actuator_id} commanded successfully.")
    else:
        print(f"Failed to command actuator {result.actuator_id}: {result.error}")

IMU Service

Access the Inertial Measurement Unit (IMU) data.

Example: Get IMU Values

# Get basic IMU sensor values
imu_values = client.imu.get_imu_values()
print(f"Accelerometer X: {imu_values.accel_x}")
print(f"Gyroscope Z: {imu_values.gyro_z}")

Example: Zero the IMU

# Zero the IMU with default parameters
response = client.imu.zero(duration=1.0)
if response.success:
    print("IMU zeroed successfully.")
else:
    print(f"Failed to zero IMU: {response.error}")

LED Matrix Service

Control an LED matrix display.

Example: Get Matrix Info

# Get LED matrix information
matrix_info = client.led_matrix.get_matrix_info()
print(f"Matrix size: {matrix_info.width} x {matrix_info.height}")

Example: Write to the LED Matrix

# Create a buffer to turn all LEDs on
buffer_size = (matrix_info['width'] * matrix_info['height']) // 8
buffer = bytes([0xFF] * buffer_size)

# Write buffer to the LED matrix
response = client.led_matrix.write_buffer(buffer)
if response.success:
    print("LED matrix updated successfully.")
else:
    print(f"Failed to update LED matrix: {response.error}")

Sound Service

Play and record audio using the robot's speakers and microphones.

❗️

WIP KOS Feature: The sound service requires an environment set up with Alpine Linux.

Preparing the Environment

  1. Copy the test script to the robot:
scp -O audio_test.py root@<KOS_IP_ADDRESS>:/root/alpine/root/

Replace <KOS_IP_ADDRESS> with the IP address of your KOS device.

  1. Connect to the robot via SSH and prepare the environment:
   ssh root@<KOS_IP_ADDRESS>
   chroot /root/alpine
   cd /root
   source venv/bin/activate

Running the Audio Test Script

python3 audio_test.py

To record longer audio (e.g., 30 seconds):

python3 audio_test.py --duration 30

Copying Recorded Audio Back to Your Computer

scp -O root@<KOS_IP_ADDRESS>:/root/alpine/root/recorded_audio.wav .

Example: Audio Test Script

`import sounddevice as sd
import soundfile as sf
import argparse

# Default parameters
DEFAULT_DURATION = 5  # Default recording duration in seconds

def main(duration):
    # File to store the recorded audio
    output_file = "recorded_audio.wav"

    # Device parameters
    record_device_id = 0      # Microphone device index
    playback_device_id = 1    # Speaker device index
    channels = [2]            # Use channel 2 of the microphone
    samplerate = 44100        # Sampling rate in Hz

    try:
        # Step 1: Record audio
        print(f"Recording for {duration} seconds...")
        audio_data = sd.rec(int(duration * samplerate), samplerate=samplerate, 
                            channels=len(channels), device=record_device_id, dtype='float32', mapping=channels)
        sd.wait()  # Wait until recording is finished
        print("Recording finished.")

        # Save the recorded audio to a WAV file
        sf.write(output_file, audio_data, samplerate)
        print(f"Audio saved to {output_file}")

        # Step 2: Play back the recorded audio
        print(f"Playing back the recorded audio...")
        data, samplerate = sf.read(output_file)
        sd.play(data, samplerate=samplerate, device=playback_device_id)
        sd.wait()  # Wait for playback to complete
        print("Playback finished.")
    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Record and play back audio with adjustable duration."
    )
    # Add argument for duration
    parser.add_argument(
        "--duration",
        type=int,
        default=DEFAULT_DURATION,
        help="Recording duration in seconds (default: 5).",
    )
    args = parser.parse_args()

    # Run the main function
    main(args.duration)

Computer Vision with KOS

Integrate computer vision capabilities with KOS using WebRTC.

❗️

WIP KOS Feature: The computer vision service requires WebRTC connection.

Requirements

Install the necessary Python packages:
pip install aiortc opencv-python requests


Example: Display Video Stream

import asyncio
import base64
import requests
import cv2
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
import logging

logging.getLogger('ffmpeg').setLevel(logging.ERROR)

# Server URL (replace with your actual server URL)
SERVER_URL = "http://<KOS_IP_ADDRESS>:8083/stream/s1/channel/0/webrtc?uuid=s1&channel=0"

# Video Display Track
class VideoDisplay(VideoStreamTrack):
    def __init__(self, track):
        super().__init__()
        self.track = track

    async def recv(self):
        frame = await self.track.recv()
        img = frame.to_ndarray(format="bgr24")
        # Do something with the frame, e.g., display or object detection
        cv2.imshow("WebRTC Video", img)
        cv2.waitKey(1)
        return frame

async def main():
    pc = RTCPeerConnection()
    pc.addTransceiver("video", direction="recvonly")

    @pc.on("track")
    def on_track(track):
        if track.kind == "video":
            display = VideoDisplay(track)
            asyncio.ensure_future(display.recv())

    # Create and set local description
    offer = await pc.createOffer()
    await pc.setLocalDescription(offer)

    # Send offer to server
    sdp_offer_base64 = base64.b64encode(pc.localDescription.sdp.encode("utf-8")).decode("utf-8")
    # Prepare headers and data
    headers = {
        "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
        "X-Requested-With": "XMLHttpRequest",
    }
    data = {"data": sdp_offer_base64}

    # Send HTTP POST request
    response = requests.post(SERVER_URL, headers=headers, data=data, verify=False)
    response.raise_for_status()

    # Decode the base64 SDP answer from the server
    sdp_answer = base64.b64decode(response.text).decode("utf-8")

    # Set remote description
    await pc.setRemoteDescription(RTCSessionDescription(sdp=sdp_answer, type="answer"))

    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        pass
    finally:
        cv2.destroyAllWindows()
        await pc.close()

if __name__ == "__main__":
    asyncio.run(main())

Explanation

  • Server URL: Replace <KOS_IP_ADDRESS> with the IP address of your KOS device.
  • Video Processing: Inside the recv method, frames are received and can be processed or simply displayed using OpenCV.
  • SSL Verification: verify=False is used in the requests.post call. If your server uses SSL certificates, you may need to handle SSL verification accordingly.

Process Manager Service

Manage processes on the robot.

Example: Start KClip Recording

# Start KClip recording with an action
response = client.process_manager.start_kclip(action="wave")
if response.success:
    print(f"KClip recording started. Clip UUID: {response.clip_uuid}")
else:
    print(f"Failed to start KClip recording: {response.error}")

Example: Stop KClip Recording

# Stop KClip recording
response = client.process_manager.stop_kclip()
if response.success:
    print(f"KClip recording stopped. Clip UUID: {response.clip_uuid}")
else:
    print(f"Failed to stop KClip recording: {response.error}")

Inference Service

Upload models and perform inference.

Example: Upload a Model

# Upload a model file
with open('model.bin', 'rb') as f:
    model_data = f.read()

metadata = {
    "model_name": "MyModel",
    "model_description": "A sample model",
    "model_version": "1.0.0",
    "model_author": "Your Name"
}
response = client.inference.upload_model(model_data, metadata)
if response.success:
    print(f"Model uploaded successfully. UID: {response.model_uid}")
else:
    print(f"Failed to upload model: {response.error}")

Example: Run Inference

# Prepare input tensor
input_tensor = {
    "values": [0.1, 0.2, 0.3],
    "shape": [{"size": 3, "name": "input", "dynamic": False}]
}
inputs = {"input_tensor": input_tensor}

# Perform inference
response = client.inference.forward(model_uid="your_model_uid", inputs=inputs)
if not response.get('error'):
    outputs = response['outputs']
    print(f"Inference outputs: {outputs}")
else:
    print(f"Inference failed: {response['error']}")

Simulation Service

Interact with the simulation environment.

Example: Reset the Simulation

# Reset simulation with specific parameters
response = client.sim.reset(
    initial_state={"qpos": [0.0, 0.0, 0.0]},
    randomize=True
)
if response.success:
    print("Simulation reset successfully.")
else:
    print(f"Failed to reset simulation: {response.error}")

Example: Step the Simulation

# Step the simulation forward by 10 steps
response = client.sim.step(num_steps=10)
if response.success:
    print("Simulation stepped forward.")
else:
    print(f"Failed to step simulation: {response.error}")

Closing the Client

When you're done using the client, be sure to close the gRPC channel:

client.close()