Python pybluez
socket#
ํ์ด์ฌ์ ๊ธฐ๋ณธ ๋ด์ฅ๋์ด์๋ socket ๋ชจ๋์ ํตํด ๊ฐ๋จํ ๋ธ๋ฃจํฌ์ค ํต์ ์ ๊ตฌ์ถ ํ ์ ์์ต๋๋ค.
๋จ ์ฐธ๊ณ ํ ๋ฌธ์๊ฐ ๊ฑฐ์ ์๊ธฐ ๋๋ฌธ์ ๋จ์ ํต์ ๊ธฐ๋ฅ ์ธ์๋ ํ๋ก๊ทธ๋จ ๊ฐ๋ฐ์ด ์ด๋ ค์ธ ์ ์์ต๋๋ค.
pybluez ์ค์น#
sudo apt install -y bluez bluetooth libbluetooth-dev python3-pip \
&& python3 -m pip install pybluez
BLE ์ถ๊ฐ ์ค์น#
sudo apt install -y pkg-config libboost-python-dev \
libboost-thread-dev libglib2.0-dev python3-dev \
&& python3 -m pip install pybluez\[ble\]
bluetooth.service ๋ณ๊ฒฝ#
์คํ ์ต์
์ผ๋ก -C
(--compat)๋ฅผ ์ถ๊ฐํฉ๋๋ค.
$ sudo vim /lib/systemd/system/bluetooth.service
...
ExecStart=/usr/lib/bluetooth/bluetoothd -C
...
SDP: Service Discovery Protocol
echo -e '\nsudo chmod 777 /var/run/sdp' >> ~/.bashrc
sudo reboot
๋ธ๋ฃจํฌ์ค ์ปจํธ๋กค๋ฌ#
์๋ ๋ช ๋ น์ ํตํด ๋ธ๋ฃจํฌ์ค ์ฅ์น๋ช ๊ณผ MAC ์ฃผ์๋ฅผ ํ์ธํฉ๋๋ค.
$ hcitool dev
Devices:
hci0 00:1A:7D:DA:71:13
์๋ ๋ช ๋ น์ ํตํด ๋ธ๋ฃจํฌ์ค ์ฅ์น์ธ hci0์ ์ค์บ์ ํ์ฉํ ์ ์์ต๋๋ค.
sudo hciconfig hci0 piscan
bluetoothctl#
bluetoothctl
๋ช
๋ น์ด๋ฅผ ํตํด ๋ธ๋ฃจํฌ์ค ์ฅ์น๋ฅผ ์ ์ดํ ์ ์์ต๋๋ค.
$ bluetoothctl
[NEW] Controller 18:1D:EA:36:DA:89 hhk7734 [default]
Agent registered
[bluetooth]# help
Menu main:
Available commands:
-------------------
advertise Advertise Options Submenu
scan Scan Options Submenu
gatt Generic Attribute Submenu
list List available controllers
show [ctrl] Controller information
select <ctrl> Select default controller
devices List available devices
paired-devices List paired devices
system-alias <name> Set controller alias
reset-alias Reset controller alias
power <on/off> Set controller power
pairable <on/off> Set controller pairable mode
discoverable <on/off> Set controller discoverable mode
agent <on/off/capability> Enable/disable agent with given capability
default-agent Set agent as the default one
advertise <on/off/type> Enable/disable advertising with given type
set-alias <alias> Set device alias
scan <on/off> Scan for devices
info [dev] Device information
pair [dev] Pair with device
trust [dev] Trust device
untrust [dev] Untrust device
block [dev] Block device
unblock [dev] Unblock device
remove <dev> Remove device
connect <dev> Connect device
disconnect [dev] Disconnect device
menu <name> Select submenu
version Display version
quit Quit program
exit Quit program
help Display help about this program
[bluetooth]#
info
์ ๋ณด๊ฐ ์๋์ค๋ฉด sudo bluetoothctl
๋ช
๋ น์ด๋ฅผ ํตํด ๋ธ๋ฃจํฌ์ค ์ฅ์น๋ฅผ ์ ์ดํ ์ ์์ต๋๋ค.
Examples#
server - client#
server#
import bluetooth
import signal
import sys
HOST = "" # '๋ธ๋ฃจํฌ์ค ์ปจํธ๋กค๋ฌ ๋งฅ ์ฃผ์'๋ฅผ ์ง์ ์
๋ ฅํด๋ ๋จ
PORT = bluetooth.PORT_ANY
UUID = "94f39d29-7d6d-437d-973b-fba39e49d4ee"
def signal_handler(sig, frame):
try:
connected_socket.close()
except:
pass
server_socket.close()
sys.exit()
signal.signal(signal.SIGINT, signal_handler)
# ๋ธ๋ฃจํฌ์ค ์๋ฒ ์์ผ ์์ฑ
server_socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
server_socket.bind((HOST, PORT))
server_socket.listen(1)
port = server_socket.getsockname()[1]
print("ํฌํธ :", port)
# ๋ธ๋ฃจํฌ์ค ์๋น์ค advertise
bluetooth.advertise_service(
server_socket,
name="server",
service_id=UUID,
service_classes=[UUID, bluetooth.SERIAL_PORT_CLASS],
profiles=[bluetooth.SERIAL_PORT_PROFILE],
)
# ํด๋ผ์ด์ธํธ ์ ์ ๋๊ธฐ
connected_socket, client_address = server_socket.accept()
try:
while True:
data = connected_socket.recv(1024)
print("client : ", data)
connected_socket.send(data)
except:
pass
connected_socket.close()
server_socket.close()
client#
import bluetooth
import signal
import sys
# ์๋ฒ ์ ๋ณด
HOST = "" # ๋ธ๋ฃจํฌ์ค ์ปจํธ๋กค๋ฌ ๋งฅ ์ฃผ์
UUID = "94f39d29-7d6d-437d-973b-fba39e49d4ee"
def signal_handler(sig, frame):
client_socket.close()
sys.exit()
signal.signal(signal.SIGINT, signal_handler)
# port๋ ๋ชจ๋ฅด๊ณ name์ด๋ uuid๋ฅผ ์๋ ๊ฒฝ์ฐ
# address ๋ฅผ ์ค์ ํ๊ณ ๋๋จธ์ง๋ฅผ None์ผ๋ก ํ๋ ๊ฒฝ์ฐ ๊ฒฐ๊ณผ๊ฐ ์ฌ๋ฌ๊ฐ ๋์ฌ ์ ์์
# name=None์ None์ธ ๊ฒฐ๊ณผ๋ฅผ ์ฐพ๋ ๊ฒ์ด ์๋. None์ด ์๋๋๋ผ๋ ๋๋จธ์ง ์ ๋ณด๋ง ์ผ์นํ๋ฉด ๊ฒฐ๊ณผ์ ํฌํจ ๋จ
# ๊ฒฐ๊ณผ๋, 'host', 'name', 'description', 'provider',
# 'protocol', 'service-classes', 'profiles', 'service_id'๋ฅผ ํค๋กํ๋ ๋์
๋๋ฆฌ
# address์ ๋ค๋ฅธ ํ๋์ ์ ๋ณด๋ง ์๋ฉด ๊ฑฐ์ ํ๋๋ก ํน์ ์ง์ด์ง
services = bluetooth.find_service(name=None, uuid=UUID, address=HOST)
if len(services) == 0:
print("์ฅ์น๋ฅผ ์ฐพ์ง ๋ชปํ์ต๋๋ค.")
sys.exit()
service = services[0]
port = service["port"]
print("ํฌํธ :", port)
# ๋ธ๋ฃจํฌ์ค ํด๋ผ์ด์ธํธ ์์ผ ์์ฑ
client_socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
# ์๋ฒ ์ ์
client_socket.connect((HOST, port))
try:
while True:
data = input("client : ")
if data:
client_socket.send(data.encode())
data = client_socket.recv(1024)
print("server :", data)
except:
pass
client_socket.close()
์ก์์ ์ฐ๋ ๋#
import bluetooth
import threading
import queue
class Bluetooth_socket(threading.Thread):
def __init__(self):
super().__init__()
self._host = ""
self._port = bluetooth.PORT_ANY
self._uuid = "00000001-0000-1000-8000-00805F9B34FB"
# ์์ ๋ฐ์ดํฐ ์ ์ฅ์ฉ ํ
self._recv_queue = queue.Queue()
self.daemon = True
# ๋ธ๋ฃจํฌ์ค ์๋ฒ ์์ผ ์์ฑ
self._socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self._socket.bind((self._host, self._port))
self._socket.listen(1)
bluetooth.advertise_service(
self._socket,
name="bluetooth_server",
service_id=self._uuid,
service_classes=[self._uuid, bluetooth.SERIAL_PORT_CLASS],
profiles=[bluetooth.SERIAL_PORT_PROFILE],
)
self._connection_start_callback = None
self._connection_stop_callback = None
self._is_connected = False
def run(self):
while True:
# ๋ธ๋ฃจํฌ์ค ์ฐ๊ฒฐ ๋๊ธฐ
self._connected_socket, client_address = self._socket.accept()
self._is_connected = True
self._start_callback()
try:
while True:
data = self._connected_socket.recv(1024)
self._recv_queue.put(data)
except:
pass
self._stop_callback()
self._is_connected = False
self._connected_socket.close()
def close(self):
self._connected_socket.close()
self._socket.close()
# ์์ ๋ฐ์ดํฐ
def recv(self):
try:
data = self._recv_queue.get(block=False)
except queue.Empty:
return None
return data
# ์ก์ ๋ฐ์ดํฐ
def send(self, data):
if self._is_connected:
self._connected_socket.send(data)
# ์ฐ๊ฒฐ ์์ ์ ํธ์ถ๋๋ ์ฝ๋ฐฑ ์ค์
def set_connection_start_callback(self, callback=None, args=()):
self._connection_start_callback = callback
self._connection_start_callback_args = args
def _start_callback(self):
if self._connection_start_callback is not None:
self._connection_start_callback(
*self._connection_start_callback_args
)
# ์ฐ๊ฒฐ ์ข
๋ฃ ์ ํธ์ถ๋๋ ์ฝ๋ฐฑ ์ค์
def set_connection_stop_callback(self, callback=None, args=()):
self._connection_stop_callback = callback
self._connection_stop_callback_args = args
def _stop_callback(self):
if self._connection_stop_callback is not None:
self._connection_stop_callback(*self._connection_stop_callback_args)
def is_connected(self):
return self._is_connected