Raspberry Pi 作品集
Python + Flask + socket + multiprocessing
受信してDBに書込み と DBからグラフ表示を合体し並列処理
Python + Flask + socket + multiprocessing
受信してDBに書込み と DBからグラフ表示を合体し並列処理
multiprocessing が必要となった背景:
● Rasp Pi で計測した温度データを、他の Rasp Pi に Wi-Fiで送信し、受信側の Rasp Piでは、受け取ったデータをデータベースに蓄積する「IoT」ライクなシステムを構築した。
● Rasp Pi で計測した温度データを、他の Rasp Pi に Wi-Fiで送信し、受信側の Rasp Piでは、受け取ったデータをデータベースに蓄積する「IoT」ライクなシステムを構築した。
● 受信側の Rasp Piでは、受信してデータベースに書き込むだけでなく、
蓄積されて行く温度データを「リアルタイムで参照」できる機能が必要となる。
蓄積されて行く温度データを「リアルタイムで参照」できる機能が必要となる。
● このためには、
1⃣.データを受信し、データベースに蓄積する「Pythonプログラム」と、
2⃣.データベースを読込んで、グラフ表示する「Pythonプログラム」の、
2つのPythonを同時に稼働させる必要がある。
1⃣.データを受信し、データベースに蓄積する「Pythonプログラム」と、
2⃣.データベースを読込んで、グラフ表示する「Pythonプログラム」の、
2つのPythonを同時に稼働させる必要がある。
★ そこで、これらの「2つのPythonを合体」させ、並列処理させる「multiprocessing」を実装してみる事にした。
スポンサー リンク
目 次
1. 2つのプロセスを並列処理させるプロがラムの構成
2つのプロセスを並列処理させるプロがラムの構成。
# multiprocessingモジュールの読み込み
from multiprocessing import Process
# Flaskインスタンス作成
app = Flask(__name__)
# 並列処理-1 DBを読んでグラフ表示
@app.route('/')
def index():
# ここに、DBを読んでグラフ表示する処理を記述
# 並列処理-2 データの受信とDB書込み
def jusin2db():
# ここに、データを受信し、データベースに書込む処理を記述
# ------------------------------------------
# 並列処理-1を起動するルーチン
def db2graph(**kwargs):
app.run(**kwargs)
if __name__ == '__main__':
server = Process(target = db2graph, kwargs = {'host': '0.0.0.0', 'port': 5001, 'threaded': True})
server.start()
jusin2db()
def index():
ここに、データを受信し、データベースに書込む処理を記述。
ここに、データを受信し、データベースに書込む処理を記述。
def jusin2db():
ここに、データを受信し、データベースに書込む処理を記述
ここに、データを受信し、データベースに書込む処理を記述
リアルタイムでブラウザに表示する Pythonは、 Flaskを利用したWebアプリなので、flaskを起動したのちに socketでの受信処理を立ち上げる。
2. 並列処理に関する記述要領
3. 実装したPythonプログラム/strong>
1⃣.データを受信し、データベースに蓄積する「Pythonプログラム」と、
2⃣.データベースを読込んで、グラフ表示する「Pythonプログラム」の、
2つのPythonを合体させ「multiprocessing」として実装。
(jusin-graph_01.py)
#!/usr/bin/python
# -*- coding: utf-8 -*
import socket
import datetime #日付ライブラリ読み込み
import json
import MySQLdb #DBライブラリの読み込み
from flask import Flask, render_template
from multiprocessing import Process
# -----------------------------------------------
# Flaskインスタンス作成
app = Flask(__name__)
# -----------------------------------------------
# 並列処理-1 DBを読んでグラフ表示
@app.route('/')
def index():
# 2次元リストを定義
temp_list = []
# MySQLに接続
connector = MySQLdb.connect(
host = "localhost",
db = "ondodb",
user = "flaskpy",
passwd = "dht22x4",
# テーブル内部で日本語を扱うために追加
charset = "utf8"
)
# カーソル取得
cursor = connector.cursor()
# SQLクエリ実行(データの読込)
sql = "select * from meas_value where DATE_ADD(nitiji, INTERVAL 24 HOUR) > NOW()"
cursor.execute(sql)
# 結果を取得し、2次元リストの形式に変換
records = cursor.fetchall()
for record in records:
temp_list.append(
{'nitiji':record[0].strftime("%Y-%m-%d %H:%M"),
'temp1':record[1],
'temp2':record[2],
'temp3':record[3],
'temp4':record[4]}
)
# データベースとの接続を閉じる
cursor.close()
connector.close()
#テンプレートへ挿入するデータの作成
title = "Raspberry Pi で測定した温度の推移(24H)グラフ"
return render_template('template.html', title=title, temp_list=temp_list)
# 並列処理-2 受信とDB書込み
def jusin2db():
# サーバーのIPアドレスとポート
server_ip = "192.168.11.112"
server_port = 5000
# ソケットを作成
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# ソケットを指定のIPアドレスとポートにバインド
server_socket.bind((server_ip, server_port))
# 接続待機
server_socket.listen(1) # 同時に接続できるクライアントの数
print(f"サーバーが {server_ip}:{server_port} で待機中...")
my_dict = {}
# データの受信と表示
while True:
# クライアントからの接続を待機
client_socket, client_address = server_socket.accept()
print(f"クライアント {client_address} が接続しました。")
data = client_socket.recv(1024) # 最大1024バイトを受け入れる
if not data:
break
print(f"受信したデータ: {data.decode('utf-8')}")
my_dict = json.loads(data.decode('utf-8'))
print(my_dict)
# 各要素を取り出す
value_1 = my_dict['1']
value_2 = my_dict["2"]
value_3 = my_dict["3"]
value_4 = my_dict["4"]
value_5 = my_dict["5"]
client_socket.close()
# DB書き込み:MySQL接続------------------------------------
connector = MySQLdb.connect(
host = "localhost",
db = "ondodb",
user = "flaskpy",
passwd = "dht22x4",
# テーブル内部で日本語を扱うために追加
charset = "utf8"
)
# カーソル取得
cursor = connector.cursor()
# SQLクエリ実行(データ追加)
sql = "insert into meas_value (nitiji, temp1, temp2, temp3, temp4) VALUES (%s, %s, %s, %s, %s)"
val = (value_1, value_2, value_3, value_4, value_5)
cursor.execute(sql, val)
connector.commit()
# カーソル終了
cursor.close()
# MySQL切断
connector.close()
# DB書込み終了---------------------------------------------
# 並列処理-1 DBを読んでグラフ表示を起動
def db2graph(**kwargs):
app.run(**kwargs)
if __name__ == '__main__':
server = Process(target = db2graph, kwargs = {'host': '0.0.0.0', 'port': 5001, 'threaded': True})
server.start()
jusin2db()
上記例では、 全ての処理をコーディングしたが、単独で実行していた既存の
【データを受信し、データベースに蓄積する「Pythonプログラム」jusin_05.py】を読み込む方法(import)で、「並列処理-2」を実行しても問題なく稼働する。
【データを受信し、データベースに蓄積する「Pythonプログラム」jusin_05.py】を読み込む方法(import)で、「並列処理-2」を実行しても問題なく稼働する。
# ---省略--------------------------------------
import jusin_05
# ---省略--------------------------------------
# 並列処理-2 受信とDB書込み
def jusin2db():
jusin_05.start_server()
# ---省略--------------------------------------
参考:
4. 実行結果
合体したPythonを実行してみる。
python3 work/jusin-graph_01.py
「サーバーが 192.168.11.112:5000 で待機中...」と
「* Running on http://0.0.0.0:5001/ (Press CTRL+C to quit)」の
2つが表示された。
「* Running on http://0.0.0.0:5001/ (Press CTRL+C to quit)」の
2つが表示された。
nohupをつけてコマンドを実行する。(コマンドの最後にアンパサンド(&)をつけると、バックグラウンドで起動できる。)
nohup python3 work/jusin-graph_01.py &
プロセス番号「5054」が表示される。
実行中のプロセスを表示してみる。
ps aux | grep "python"
実行した「jusin-graph_01.py」で「5054」と「5055」の2つのプロセスが動いている。
以上。
(2024.01.29)
(2024.01.29)
スポンサー リンク