본문으로 건너뛰기

Python pybluez

socket

파이썬에 기본 내장되어있는 socket 모듈을 통해 간단한 블루투스 통신을 구축 할 수 있습니다.

단 참고할 문서가 거의 없기 때문에 단순 통신 기능 외에는 프로그램 개발이 어려울 수 있습니다.

pybluez 설치

sudo apt install -y bluez bluetooth libbluetooth-dev python3-pip \
&& python3 -m pip install pybluez

BLE 추가 설치

sudo apt install -y pkg-config libboost-python-dev \
libboost-thread-dev libglib2.0-dev python3-dev \
&& python3 -m pip install pybluez\[ble\]

bluetooth.service 변경

실행 옵션으로 -C(--compat)를 추가합니다.

$ sudo vim /lib/systemd/system/bluetooth.service
...
ExecStart=/usr/lib/bluetooth/bluetoothd -C
...

SDP: Service Discovery Protocol

echo -e '\nsudo chmod 777 /var/run/sdp' >> ~/.bashrc
sudo reboot

블루투스 컨트롤러

아래 명령을 통해 블루투스 장치명과 MAC 주소를 확인합니다.

$ hcitool dev
Devices:
hci0 00:1A:7D:DA:71:13

아래 명령을 통해 블루투스 장치인 hci0의 스캔을 허용할 수 있습니다.

sudo hciconfig hci0 piscan

bluetoothctl

bluetoothctl 명령어를 통해 블루투스 장치를 제어할 수 있습니다.

$ bluetoothctl
[NEW] Controller 18:1D:EA:36:DA:89 hhk7734 [default]
Agent registered
[bluetooth]# help
Menu main:
Available commands:
-------------------
advertise Advertise Options Submenu
scan Scan Options Submenu
gatt Generic Attribute Submenu
list List available controllers
show [ctrl] Controller information
select <ctrl> Select default controller
devices List available devices
paired-devices List paired devices
system-alias <name> Set controller alias
reset-alias Reset controller alias
power <on/off> Set controller power
pairable <on/off> Set controller pairable mode
discoverable <on/off> Set controller discoverable mode
agent <on/off/capability> Enable/disable agent with given capability
default-agent Set agent as the default one
advertise <on/off/type> Enable/disable advertising with given type
set-alias <alias> Set device alias
scan <on/off> Scan for devices
info [dev] Device information
pair [dev] Pair with device
trust [dev] Trust device
untrust [dev] Untrust device
block [dev] Block device
unblock [dev] Unblock device
remove <dev> Remove device
connect <dev> Connect device
disconnect [dev] Disconnect device
menu <name> Select submenu
version Display version
quit Quit program
exit Quit program
help Display help about this program
[bluetooth]#
정보

정보가 안나오면 sudo bluetoothctl 명령어를 통해 블루투스 장치를 제어할 수 있습니다.

Examples

server - client

server

import bluetooth
import signal
import sys

HOST = "" # '블루투스 컨트롤러 맥 주소'를 직접 입력해도 됨
PORT = bluetooth.PORT_ANY
UUID = "94f39d29-7d6d-437d-973b-fba39e49d4ee"


def signal_handler(sig, frame):
try:
connected_socket.close()

except:
pass

server_socket.close()
sys.exit()


signal.signal(signal.SIGINT, signal_handler)

# 블루투스 서버 소켓 생성
server_socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
server_socket.bind((HOST, PORT))
server_socket.listen(1)

port = server_socket.getsockname()[1]
print("포트 :", port)

# 블루투스 서비스 advertise
bluetooth.advertise_service(
server_socket,
name="server",
service_id=UUID,
service_classes=[UUID, bluetooth.SERIAL_PORT_CLASS],
profiles=[bluetooth.SERIAL_PORT_PROFILE],
)

# 클라이언트 접속 대기
connected_socket, client_address = server_socket.accept()

try:
while True:
data = connected_socket.recv(1024)
print("client : ", data)
connected_socket.send(data)

except:
pass

connected_socket.close()
server_socket.close()

client

import bluetooth
import signal
import sys

# 서버 정보
HOST = "" # 블루투스 컨트롤러 맥 주소
UUID = "94f39d29-7d6d-437d-973b-fba39e49d4ee"


def signal_handler(sig, frame):
client_socket.close()
sys.exit()


signal.signal(signal.SIGINT, signal_handler)

# port는 모르고 name이나 uuid를 아는 경우
# address 를 설정하고 나머지를 None으로 하는 경우 결과가 여러개 나올 수 있음
# name=None은 None인 결과를 찾는 것이 아님. None이 아니더라도 나머지 정보만 일치하면 결과에 포함 됨
# 결과는, 'host', 'name', 'description', 'provider',
# 'protocol', 'service-classes', 'profiles', 'service_id'를 키로하는 딕셔너리
# address와 다른 하나의 정보만 알면 거의 하나로 특정지어짐
services = bluetooth.find_service(name=None, uuid=UUID, address=HOST)

if len(services) == 0:
print("장치를 찾지 못했습니다.")
sys.exit()

service = services[0]
port = service["port"]
print("포트 :", port)

# 블루투스 클라이언트 소켓 생성
client_socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)

# 서버 접속
client_socket.connect((HOST, port))

try:
while True:
data = input("client : ")
if data:
client_socket.send(data.encode())
data = client_socket.recv(1024)
print("server :", data)

except:
pass

client_socket.close()

송수신 쓰레드

import bluetooth
import threading
import queue


class Bluetooth_socket(threading.Thread):
def __init__(self):
super().__init__()
self._host = ""
self._port = bluetooth.PORT_ANY
self._uuid = "00000001-0000-1000-8000-00805F9B34FB"

# 수신 데이터 저장용 큐
self._recv_queue = queue.Queue()
self.daemon = True

# 블루투스 서버 소켓 생성
self._socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self._socket.bind((self._host, self._port))
self._socket.listen(1)
bluetooth.advertise_service(
self._socket,
name="bluetooth_server",
service_id=self._uuid,
service_classes=[self._uuid, bluetooth.SERIAL_PORT_CLASS],
profiles=[bluetooth.SERIAL_PORT_PROFILE],
)

self._connection_start_callback = None
self._connection_stop_callback = None

self._is_connected = False

def run(self):
while True:
# 블루투스 연결 대기
self._connected_socket, client_address = self._socket.accept()
self._is_connected = True

self._start_callback()

try:
while True:
data = self._connected_socket.recv(1024)
self._recv_queue.put(data)

except:
pass

self._stop_callback()

self._is_connected = False
self._connected_socket.close()

def close(self):
self._connected_socket.close()
self._socket.close()

# 수신 데이터
def recv(self):
try:
data = self._recv_queue.get(block=False)

except queue.Empty:
return None

return data

# 송신 데이터
def send(self, data):
if self._is_connected:
self._connected_socket.send(data)

# 연결 시작 시 호출되는 콜백 설정
def set_connection_start_callback(self, callback=None, args=()):
self._connection_start_callback = callback
self._connection_start_callback_args = args

def _start_callback(self):
if self._connection_start_callback is not None:
self._connection_start_callback(
*self._connection_start_callback_args
)

# 연결 종료 시 호출되는 콜백 설정
def set_connection_stop_callback(self, callback=None, args=()):
self._connection_stop_callback = callback
self._connection_stop_callback_args = args

def _stop_callback(self):
if self._connection_stop_callback is not None:
self._connection_stop_callback(*self._connection_stop_callback_args)

def is_connected(self):
return self._is_connected

Reference