ゲームパッドでラズタンクを操作する

CoderDojo青梅 午後の部の参加者に朗報です!
これまでラズタンクのリモート操作は、第二期までPCキーボードの矢印キーで動かし、第三期はNode-REDのダッシュボードのボタンで動かしました。ダッシュボードはスマホ画面のボタンでも操作できる優れたUI(ユーザー インターフェース)でした。今年、第四期ではUSBゲームパッドで操作のレスポンス向上にチャレンジします。ゲームパッドは、メンター西村さんが試作し、そのクイックなレスポンスに感動しました。この感動を午後の部参加者全員に体験していただきます。体験してほしいプログラミング技術は、(1)ブラウザに組み込まれたゲームパッドを使えるようにする navigator API と、(2)ブラウザとデバイスが双方向通信できる WebSocket 通信の2つです。

やってみよう!

システム構成

ノートPCのブラウザで実行する[gamepad.html]は、ラズパイ側WEBサーバー「Apache2」のHTMLコンテンツを配置する/var/www/htmlディレクトリに保存する。
ラズパイのターミナルで実行する[gamepad.py]は、ラズパイ側 ~/pythonディレクトリに保存する。

その1.Webクライアント[gamepad.html]

WEBブラウザで動く JavaScript コードを制作します。下記は、PCに接続したゲームパッドのボタンや上下左右スティックの状態をブラウザに表示するコードです。
このコードは、ラズパイのWebSocketサーバー[razpi**.localのポート番号 8765]にゲームパッドの入力値を送信します。

ファイル保存場所: /var/www/html/gamepad.html

<!DOCTYPE html>
<html>
<head>
  <meta charset="UTF-8">
  <title>Gamepad to ラズタンク</title>
</head>
<body>
  <h2>Gamepad for ラズタンク操作</h2>
  <pre id="status">ゲームパッドを操作してください...</pre>
  <pre id="output">[]</pre>
  <pre id="visual">[]</pre>

<script>
    let socket = new WebSocket("ws://razpi43.local:8765/"); // ラズパイのURI
    let lastData = null; // 前回送信したデータを保持

    // [visual]のところに押されたボタンを表示する機能
    function renderVisual(buttons, axes) {
        const arrows = {
            up:    buttons[12] ? "⬆️" : "⬛",
            down:  buttons[13] ? "⬇️" : "⬛",
            left:  buttons[14] ? "⬅️" : "⬛",
            right: buttons[15] ? "➡️" : "⬛"
        };
        const faceButtons = {
            A: buttons[0] ? "🟢" : "⚪",
            B: buttons[1] ? "🔴" : "⚪",
            X: buttons[2] ? "🔵" : "⚪",
            Y: buttons[3] ? "🟡" : "⚪"
        };
        const sticks = `
            Left Stick: X=${axes[0].toFixed(2)} Y=${axes[1].toFixed(2)}
            Right Stick: X=${axes[2].toFixed(2)} Y=${axes[3].toFixed(2)}
        `;
        const visual = `
            D-Pad:
             ${arrows.up}
            ${arrows.left} ${arrows.right}
             ${arrows.down}
            Buttons: A=${faceButtons.A} B=${faceButtons.B} X=${faceButtons.X} Y=${faceButtons.Y}
            ${sticks}
        `;
      document.getElementById("visual").textContent = visual;
    }

    // ブラウザの[navigator]機能を使って、ゲームパッドのボタン入力を検知する
    function pollGamepad() {
        const gamepads = navigator.getGamepads();      // ゲームパッドの入力値を取得する
        const gp = gamepads[0];                        // 最大4台のUSBゲームパッドは1番だけ対象にする[1番=0]

        // ブラウザの画面にゲームパッドのボタン入力状態を表示
        if (gp) {
            const data = {
                id: gp.id,                               // ゲームパッドのID名(例: Xbox)
                buttons: gp.buttons.map(b => b.pressed), // スティック以外のボタン入力
                axes: gp.axes                            // スティック入力
            };
            const jsonData = JSON.stringify(data, null, 2);            // JSON形式に変換
            document.getElementById("output").textContent = jsonData;  // 変換したJSON形式データを表示
            renderVisual(data.buttons, data.axes);     // function定義した renderVisual() に入力値を渡す

            // ボタン入力値が前回と違う場合のみ、ラズパイにデータ送信
            if (socket.readyState === WebSocket.OPEN && jsonData !== lastData) {
                socket.send(jsonData);     // socket通信する
                lastData = jsonData;       // 送信したデータを記憶する
            }
        }

        requestAnimationFrame(pollGamepad);   // 繰り返す
    }

    // ずっと function定義した pollGamepad() を呼び出す(ポーリングする)
    window.addEventListener("gamepadconnected", () => {
        document.getElementById("status").textContent = "Gamepad connected";
        pollGamepad();
    });
</script></body>
</html>

WEBブラウザで動作検証

ノートPCのWEBブラウザで、ラズパイに保存したHTMLにアクセスします。
成功するとゲームパッドを操作すると配列の値が変化します。なお、期待通り動かない時は[F12]キーを押してDevToolsのコンソールにエラーが表示されていないか確認します。

ブラウザのURL: http://razpi**.local/gamepad.html

その2.ラズパイ側 WebSocketサーバー[gamepad.py]

次はラズパイで実行する pythonコードです。ノートPCからゲームパッドの入力値を受信して表示します。

ファイル保存場所: /home/pi/python/gamepad.py

import asyncio
import websockets
import json

DEADZONE = 0.2       # スティックの誤差を吸収する

# ボタン番号を日本語で表示するための定義
BUTTON_NAMES = {
    0: "A", 1: "B", 2: "X", 3: "Y",
    4: "LB", 5: "RB", 6: "LT", 7: "RT",
    8: "バック", 9: "スタート", 10: "左スティック押し込み",
    11: "右スティック押し込み", 12: "十字 ↑", 13: "十字 ↓",
    14: "十字 ←", 15: "十字 →", 16: "ガイド"
}

latest_state = {
    "Left_Stick": [],
    "Right_Stick": [],
    "押されているボタン": []
}

def button():
    return latest_state

# 左右スティックのx,y方向を抜き取り。動きなければ[ニュートラル]を代入。
def get_stick_direction(name, x, y):
    direction = []
    if abs(x) > DEADZONE:
        direction.append(f"{name} ← 左" if x < 0 else f"{name} → 右")
    if abs(y) > DEADZONE:
        direction.append(f"{name} ↑ 上" if y < 0 else f"{name} ↓ 下")
    return direction

# ボタンが押されているか判定する。押されなければ[なし]を代入
def get_pressed_buttons(buttons):
    return [
        BUTTON_NAMES.get(i, f"ボタン{i}")
        for i, pressed in enumerate(buttons)
        if pressed
    ]

# PCブラウザとのwebsocket接続に一度成功すると、ずっとJSON形式のメッセージを受信できるようになる
async def handler(websocket):
    async for message in websocket:
        try:
            data = json.loads(message)
            axes = data.get("axes", [])
            buttons = data.get("buttons", [])

            # 左スティック(axes[0], axes[1])
            lx = axes[0] if len(axes) > 0 else 0
            ly = axes[1] if len(axes) > 1 else 0
            left_direction = get_stick_direction("Left_Stick:", lx, ly)

            # 右スティック(axes[2], axes[3])
            rx = axes[2] if len(axes) > 2 else 0
            ry = axes[3] if len(axes) > 3 else 0
            right_direction = get_stick_direction("Right_Stick:", rx, ry)

            pressed = get_pressed_buttons(buttons)

            if left_direction:
                print(', '.join(left_direction))
            if right_direction:
                print(', '.join(right_direction))
            if pressed:
                print(', '.join(pressed))
            global latest_state
            latest_state = {
                "Left_Stick": left_direction,
                "Right_Stick": right_direction,
                "押されているボタン": pressed
            }

        except Exception as e:
            print(f"⚠️ エラー: {e}")

# WebSocketサーバーを立ち上げる
async def main():
    print("🎮 WebSocketサーバー起動中(ポート8765)...")
    async with websockets.serve(handler, "0.0.0.0", 8765):
        await asyncio.Future()      # ずっと待機

# 外部から呼ばれる(from tank_gamepad.py)
def start_server():
    asyncio.run(main())    # ここが一番最初に実行される命令

# もしgamepad.pyを単独起動なら
if __name__ == '__main__':
    asyncio.run(main())    # ここが一番最初に実行される命令

ラズパイで動作検査

手順1. ラズパイでpythonコードを実行する

$ python gamepad.py

手順2. ノートPCブラウザでアクセス

http://razpi**.local/gamepad.html

手順3. ゲームパッドを検知するか検査

その3.ラズタンクを動かす[tank_gamepad.py]

いよいよ、ラズタンクを運転できる pythonコードです。ノートPC側ゲームパッドの入力値を受信してモーター駆動します。

ファイル保存場所: /home/pi/python/tank_gamepad.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# ---------------------------
# 初期設定
# ---------------------------
import asyncio, websockets, json, sys, threading, time
import RPi.GPIO as GPIO

# GPIO設定(自分のラズタンクのGPIO番号と一致させること)
LEFT_PWM, LEFT_IN1, LEFT_IN2 = 22, 17, 27
RIGHT_PWM, RIGHT_IN1, RIGHT_IN2 = 16, 21, 20

# GPIO番号でセットする(ピン番号じゃない)全て出力モードにする
GPIO.setmode(GPIO.BCM)
for pin in [LEFT_PWM, LEFT_IN1, LEFT_IN2, RIGHT_PWM, RIGHT_IN1, RIGHT_IN2]:
    GPIO.setup(pin, GPIO.OUT)

# PWM初期値(50Hz, デューティ比0%で開始)
pwmL = GPIO.PWM(LEFT_PWM, 50); pwmL.start(0)
pwmR = GPIO.PWM(RIGHT_PWM, 50); pwmR.start(0)

# ゲームパッド初期値(状態保持)
DEADZONE = 0.2                                      # 誤差を無視するため
current_state = {"stick": [], "buttons": []}
last_received = time.time()

# ---------------------------
# 関数定義: ラズタンクを前後左右に移動させる命令
# ---------------------------
def move(l1, l2, r1, r2, speed):
    GPIO.output(LEFT_IN1, l1); GPIO.output(LEFT_IN2, l2)     # 左モーター IN1 IN2
    GPIO.output(RIGHT_IN1, r1); GPIO.output(RIGHT_IN2, r2)   # 右モーター IN1 IN2
    pwmL.ChangeDutyCycle(speed); pwmR.ChangeDutyCycle(speed) # スピード(0~100%)

# ラズタンクを停止する命令
def stop(): move(0,0,0,0,0)

# pythonプログラムを終了するときの初期化
def end(): stop(); pwmL.stop(); pwmR.stop(); GPIO.cleanup(); sys.exit(0)

# ゲームパッド入力を安定させる(暴走しないように安全対策)
def get_active_state():
    now = time.time()
    if now - last_received < 1.0:
        return current_state                  # 1秒以内にゲームパッド入力あった場合
    elif now - last_received > 3.0:
        return {"stick": [], "buttons": []}   # 3秒以上、ゲームパッド入力がなければ停止
    else:
        return current_state                  # 入力値を記憶

# ノートPCからの WebSocket 受信コード
async def handler(ws):
    global current_state, last_received
    async for msg in ws:                      # 受信したデータをmsgに代入
        try:
            data = json.loads(msg)            # json形式に変換しdataに代入
            axes = data.get("axes", [0]*4)    # dataからaxes値を取り出す. [0,1,2,3]=左X軸, 左Y軸, 右X軸, 右Y軸
            buttons = data.get("buttons", []) # dataからbuttons値を取り出す. [False,True,False,True...]=全部
            x, y = axes[0], axes[1]           # 左スティックで移動するのでx,yを定義
            dirs = []                         # 空の配列dirsを初期定義
            if abs(x) > DEADZONE: dirs.append("←" if x < 0 else "→") # dirs配列に[←]or[→]を代入
            if abs(y) > DEADZONE: dirs.append("↑" if y < 0 else "↓") # dirs配列に[↑]or[↓]を代入
            pressed = [i for i, b in enumerate(buttons) if b]    # 対で取り出す(0,False),(1,True),(2,False). bはTrueの対のみ対象
            current_state = {"stick": dirs, "buttons": pressed}  # ゲームパッド入力を記憶
            last_received = time.time()                          # 現在時刻を記憶
        except: pass

# ---------------------------
# 関数定義: 最初に実行する命令(メイン)
# ---------------------------
async def main():
    print("🎮 WebSocketサーバー起動中(ポート8765)...")
    print("▶️ startボタンでpythonコードを終了")
    async with websockets.serve(handler, "0.0.0.0", 8765):
        await asyncio.Future()              # ずっと待ち受け(終了しないようにするためのダミー待機)

# ---------------------------
# 最初に実行される命令: WebSocketサーバーをバックグラウンドで起動(メインループと並行して動かす)
# ---------------------------
threading.Thread(target=lambda: asyncio.run(main()), daemon=True).start()

# ---------------------------
# ずっと繰り返す: ラズタンクを動かすループ. (Start)ボタンが押されるまで.
# ---------------------------
try:
    while True:
        s = get_active_state()
        dirs = s["stick"]
        btns = s["buttons"]

        if "←" in dirs:   move(0,1,1,0, 75)  # モーター[↓][↑] 速度 75%
        elif "→" in dirs: move(1,0,0,1, 75)  # モーター[↑][↓] 速度 75%
        elif "↑" in dirs: move(1,0,1,0,100)  # モーター[↑][↑] 速度100%
        elif "↓" in dirs: move(0,1,0,1,100)  # モーター[↓][↓] 速度100%
        else: stop()                         # その他のとき停止

        if 9 in btns: end()  # ボタン9 = [start]
        time.sleep(0.2)

except KeyboardInterrupt:
    print("KeyboardInterruptで終了する")
    end()

ラズタンクで動作検査

手順1. ラズタンクでpythonコードを実行する

$ python tank_gamepad.py

手順2. ノートPCブラウザでアクセス

http://razpi**.local/gamepad.html

手順3. ゲームパッドで運転できるか検査

ゲームパッドの左スティックで、ラズタンクを前後・左右に移動できるはずだ。[start]ボタンでpythonコードを停止する。

コメントを残す