USBゲームパッドが接続されたノートPC側から、ラズパイに「レバーやボタン操作」のコマンドを送信する。
また、ラズパイ側から、ノートPCに「レバーやボタン操作」を受信できたことを折り返し返信する。
前回は片方向通信だったが、今回は相互に通信できるようにコードを改造する。
まずはノートPC側とラズパイ側の2つの新規ファイルを作成する。
ノートPC側プログラム[controller2.html]
<!DOCTYPE html>
<html lang="ja">
<head>
<title>Gamepad to Raztank</title>
<meta charset="UTF-8">
<style>
html, body {
margin: 0;
padding: 0;
height: 100vh; /*画面の高さに合わせる */
/* width: 100vw; 画面の幅に合わせる */
overflow: auto; /* スクロールバーを表示|hiddenなら消す */
}
img {
width: 50vw; /* ここを変更すると画面の横幅を大小変更できる */
/*height: 100vh;*/
object-fit: cover; /* 引き伸ばし表示 */
display: block;
}
</style>
</head>
<body>
<h2>Gamepad Input for Raztank Python</h2>
<p>
Gamepad: <span id="gamepad"></span><br>
WebSocket: <span id="websockets"></span><br>
Operation: <span id="operation"></span>
</p>
<pre id="output"></pre>
<!-- <iframe src="http://razpi43.local:8080/?action=stream" width="640" height="480" frameborder="0" allowfullscreen></iframe> -->
<img src="http://razpi43.local:8080/?action=stream" alt="MJPEG Stream">
<!-- REV3 追加 -->
<p>ラズパイから受信:</p>
<div id="dashboard"></div>
<!-- ここから JavaScript プログラム -->
<script>
let socket;
let lastData = null; // 前回送信したデータを保持
// WebSocketイベントが発生したら表示変更
const wsStatus = document.getElementById("websockets");
function connectWebSocket() {
socket = new WebSocket("ws://razpi43.local:8765/gamepad");
socket.onopen = () => wsStatus.textContent = "接続成功";
socket.onerror = () => wsStatus.textContent = "エラー";
socket.onclose = () => {
wsStatus.textContent = "切断";
setTimeout(connectWebSocket, 3000); // 1秒毎に接続トライ
}
// ラズパイからWebSocket受信イベント
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
updateDashboard(data);
};
}
function pollGamepad() {
const gp = navigator.getGamepads()[0];
if (gp) {
const DEADZONE = 0.2; // HTML側でもデッドゾーンを設定
const axes = gp.axes.map(v => (Math.abs(v) < DEADZONE ? 0 : v));
const data = {
buttons: gp.buttons.map(b => b.pressed),
axes: axes
};
const jsonData = JSON.stringify(data);
document.getElementById("output").textContent = jsonData;
if (socket && socket.readyState === WebSocket.OPEN && jsonData !== lastData) {
socket.send(jsonData);
lastData = jsonData;
}
}
requestAnimationFrame(pollGamepad);
}
window.addEventListener("gamepadconnected", () => {
document.getElementById("gamepad").textContent = "接続成功";
pollGamepad();
});
// 初回websocket接続
connectWebSocket();
// REV3 追加(dashboardを描画する関数)
function updateDashboard(data) {
const dash = document.getElementById("dashboard");
dash.innerHTML = `
<p>Stick: ${data.stick.join(", ")}</p>
<p>Buttons: ${data.buttons.join(", ")}</p>
`;
}
// ウインドウがアクティブになったとき
window.onfocus = () => {
document.getElementById("operation").textContent = "ON";
};
// ウインドウが非アクティブになったとき
window.onblur = () => {
document.getElementById("operation").textContent = "OFF";
};
</script>
<!-- JavaScriptプログラムはここまで -->
</body>
</html>ラズパイ側プログラム[tank_gamepad2.py]
# ------------------------------------------
# ゲームパッドを接続したPC(html)と通信するラズタンク運転アプリ
# ------------------------------------------
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio, websockets, json, sys, threading, time
import RPi.GPIO as GPIO
from flask import Flask, send_from_directory
# ---------------------------
# Flaskサーバー設定(controller2.htmlを返す)
# ---------------------------
app = Flask(__name__)
@app.route("/")
def index():
return send_from_directory(".", "controller2.html")
def run_flask():
app.run(host="0.0.0.0", port=5000, debug=False)
# ---------------------------
# GPIO設定
# ---------------------------
LEFT_PWM, LEFT_IN1, LEFT_IN2 = 22, 17, 27
RIGHT_PWM, RIGHT_IN1, RIGHT_IN2 = 16, 21, 20
GPIO.setmode(GPIO.BCM)
for pin in [LEFT_PWM, LEFT_IN1, LEFT_IN2, RIGHT_PWM, RIGHT_IN1, RIGHT_IN2]:
GPIO.setup(pin, GPIO.OUT)
pwmL = GPIO.PWM(LEFT_PWM, 50); pwmL.start(0)
pwmR = GPIO.PWM(RIGHT_PWM, 50); pwmR.start(0)
# ---------------------------
# モーター制御関数
# ---------------------------
def move(l1, l2, r1, r2, speed):
GPIO.output(LEFT_IN1, l1); GPIO.output(LEFT_IN2, l2)
GPIO.output(RIGHT_IN1, r1); GPIO.output(RIGHT_IN2, r2)
pwmL.ChangeDutyCycle(speed); pwmR.ChangeDutyCycle(speed)
def stop():
move(0,0,0,0,0)
def end():
stop(); pwmL.stop(); pwmR.stop(); GPIO.cleanup(); sys.exit(0)
# ---------------------------
# WebSocket受信処理
# ---------------------------
async def handler(ws):
async for msg in ws:
try:
data = json.loads(msg)
axes = data.get("axes", [0,0,0,0])
buttons = data.get("buttons", [])
x, y = axes[0], axes[1]
dirs = []
if x < 0: dirs.append("←")
elif x > 0: dirs.append("→")
if y < 0: dirs.append("↑")
elif y > 0: dirs.append("↓")
# モーター制御
if "←" in dirs: move(0,1,1,0,75)
elif "→" in dirs: move(1,0,0,1,75)
elif "↑" in dirs: move(1,0,1,0,100)
elif "↓" in dirs: move(0,1,0,1,100)
else: stop()
# ブラウザに返信(dashboard用)
await ws.send(json.dumps({"stick": dirs, "buttons": buttons}))
# Startボタンで終了
if len(buttons) > 9 and buttons[9]:
end()
except Exception as e:
print("Error:", e)
# ---------------------------
# WebSocketサーバー起動
# ---------------------------
async def main():
print("🎮 WebSocketサーバー起動中(ポート8765)...")
async with websockets.serve(handler, "0.0.0.0", 8765):
await asyncio.Future()
# ---------------------------
# メイン処理
# ---------------------------
if __name__ == "__main__":
# Flaskをバックグラウンドで起動
threading.Thread(target=run_flask, daemon=True).start()
# WebSocketサーバー起動
asyncio.run(main())実行手順
ラズパイ側はバックエンドアプリとして起動する
$ python tank_gamepad2.py下記表示されたら成功
* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)ノートPC側はフロントWEBとしてアクセスする
ブラウザを開いて
http://razpi43.local:5000/通信接続テスト
USBゲームパッドを接続したノートPCとラズパイが相互通信できることを検査する。
