HOME → 1 Raspberry Pi → 03 作品集 → 

2つのPythonを合体し 並列処理させる「multiprocessing」の実装

Raspberry Pi 作品集
Python + Flask + socket + multiprocessing
受信してDBに書込み と DBからグラフ表示を合体し並列処理
 
multiprocessing が必要となった背景:
● Rasp Pi で計測した温度データを、他の Rasp Pi に Wi-Fiで送信し、受信側の Rasp Piでは、受け取ったデータをデータベースに蓄積する「IoT」ライクなシステムを構築した。
 
● 受信側の Rasp Piでは、受信してデータベースに書き込むだけでなく、
蓄積されて行く温度データを「リアルタイムで参照」できる機能が必要となる。
 
● このためには、
1⃣.データを受信し、データベースに蓄積する「Pythonプログラム」と、
2⃣.データベースを読込んで、グラフ表示する「Pythonプログラム」の、
2つのPythonを同時に稼働させる必要がある。
 
★ そこで、これらの「2つのPythonを合体」させ、並列処理させる「multiprocessing」を実装してみる事にした。
 
実行中のプロセスを表示
 
 

 

スポンサー リンク

 

 
 
 
 
 
1. 2つのプロセスを並列処理させるプロがラムの構成
 
2つのPythonを合体し 並列処理させる「multiprocessing」の実装イメージ。
2つのPythonを合体し 並列処理させる「multiprocessing」の実装
 
2つのプロセスを並列処理させるプロがラムの構成。
# multiprocessingモジュールの読み込み
from multiprocessing import Process

# Flaskインスタンス作成
app = Flask(__name__)

# 並列処理-1 DBを読んでグラフ表示
@app.route('/')
def index():

	# ここに、DBを読んでグラフ表示する処理を記述

# 並列処理-2 データの受信とDB書込み
def jusin2db():

	# ここに、データを受信し、データベースに書込む処理を記述

# ------------------------------------------

# 並列処理-1を起動するルーチン
def db2graph(**kwargs):
    app.run(**kwargs)


if __name__ == '__main__':
    server = Process(target = db2graph, kwargs = {'host': '0.0.0.0', 'port': 5001, 'threaded': True})
    server.start()

    jusin2db()
 
def index():
ここに、データを受信し、データベースに書込む処理を記述。
 
def jusin2db():
ここに、データを受信し、データベースに書込む処理を記述
 
リアルタイムでブラウザに表示する Pythonは、 Flaskを利用したWebアプリなので、flaskを起動したのちに socketでの受信処理を立ち上げる。
 
 
 
2. 並列処理に関する記述要領
 
 「multiprocessingモジュールの読み込み」と「Flaskインスタンス作成」。
 「multiprocessingモジュールの読み込み」と「Flaskインスタンス作成」
 
プロセス名を指定して、プロセスを生成する。
プロセス名を指定して、プロセスを生成する
 
Flaskアプリの起動と引数の渡し方。
Flaskアプリの起動と引数の渡し方
 
 
 
3. 実装したPythonプログラム/strong>
 

1⃣.データを受信し、データベースに蓄積する「Pythonプログラム」と、
2⃣.データベースを読込んで、グラフ表示する「Pythonプログラム」の、
2つのPythonを合体させ「multiprocessing」として実装。
jusin-graph_01.py

#!/usr/bin/python
# -*- coding: utf-8 -*

import socket
import datetime #日付ライブラリ読み込み
import json
import MySQLdb #DBライブラリの読み込み
from flask import Flask, render_template
from multiprocessing import Process
# -----------------------------------------------

# Flaskインスタンス作成
app = Flask(__name__)

# -----------------------------------------------
# 並列処理-1 DBを読んでグラフ表示

@app.route('/')
def index():

	# 2次元リストを定義
	temp_list = []

	# MySQLに接続
	connector = MySQLdb.connect(
		host = "localhost",
		db = "ondodb",
		user = "flaskpy",
		passwd = "dht22x4",
	# テーブル内部で日本語を扱うために追加
		charset = "utf8"
	)

	# カーソル取得
	cursor = connector.cursor()

	# SQLクエリ実行(データの読込)
	sql = "select * from meas_value where DATE_ADD(nitiji, INTERVAL 24 HOUR) > NOW()"
	cursor.execute(sql)

	# 結果を取得し、2次元リストの形式に変換
	records = cursor.fetchall()
	for record in records:
		temp_list.append(
		{'nitiji':record[0].strftime("%Y-%m-%d %H:%M"),
		 'temp1':record[1],
		 'temp2':record[2],
		 'temp3':record[3],
		 'temp4':record[4]}
		)

	# データベースとの接続を閉じる
	cursor.close()
	connector.close()

	#テンプレートへ挿入するデータの作成
	title = "Raspberry Pi で測定した温度の推移(24H)グラフ"

	return render_template('template.html', title=title, temp_list=temp_list)


# 並列処理-2 受信とDB書込み
def jusin2db():

	# サーバーのIPアドレスとポート
	server_ip = "192.168.11.112"
	server_port = 5000

	# ソケットを作成
	server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

	# ソケットを指定のIPアドレスとポートにバインド
	server_socket.bind((server_ip, server_port))

	# 接続待機
	server_socket.listen(1)  # 同時に接続できるクライアントの数
	print(f"サーバーが {server_ip}:{server_port} で待機中...")

	my_dict = {}

	# データの受信と表示
	while True:
		# クライアントからの接続を待機
		client_socket, client_address = server_socket.accept()
		print(f"クライアント {client_address} が接続しました。")

		data = client_socket.recv(1024)  # 最大1024バイトを受け入れる
		if not data:
		    break

		print(f"受信したデータ: {data.decode('utf-8')}")
		my_dict = json.loads(data.decode('utf-8'))
		print(my_dict)

		# 各要素を取り出す
		value_1 = my_dict['1']
		value_2 = my_dict["2"]
		value_3 = my_dict["3"]
		value_4 = my_dict["4"]
		value_5 = my_dict["5"]

		client_socket.close()

		# DB書き込み:MySQL接続------------------------------------
		connector = MySQLdb.connect(
			host = "localhost",
			db = "ondodb",
			user = "flaskpy",
			passwd = "dht22x4",
		# テーブル内部で日本語を扱うために追加
			charset = "utf8"
		)

		# カーソル取得
		cursor = connector.cursor()

		# SQLクエリ実行(データ追加)
		sql = "insert into meas_value (nitiji, temp1, temp2, temp3, temp4) VALUES (%s, %s, %s, %s, %s)"
		val = (value_1, value_2, value_3, value_4, value_5)
		cursor.execute(sql, val)

		connector.commit()

		# カーソル終了
		cursor.close()

		# MySQL切断
		connector.close()

		# DB書込み終了---------------------------------------------

# 並列処理-1 DBを読んでグラフ表示を起動
def db2graph(**kwargs):
    app.run(**kwargs)


if __name__ == '__main__':
    server = Process(target = db2graph, kwargs = {'host': '0.0.0.0', 'port': 5001, 'threaded': True})
    server.start()

    jusin2db()
 
上記例では、 全ての処理をコーディングしたが、単独で実行していた既存の
【データを受信し、データベースに蓄積する「Pythonプログラム」jusin_05.py】を読み込む方法(import)で、「並列処理-2」を実行しても問題なく稼働する。
# ---省略--------------------------------------

import jusin_05

# ---省略--------------------------------------

# 並列処理-2 受信とDB書込み
def jusin2db():
    jusin_05.start_server()

# ---省略--------------------------------------
 
参考:
 
 
 
4. 実行結果
 
合体したPythonを実行してみる。
python3 work/jusin-graph_01.py
 
合体したPythonを実行した結果
 
「サーバーが 192.168.11.112:5000 で待機中...」と
「* Running on http://0.0.0.0:5001/ (Press CTRL+C to quit)」の
2つが表示された。
 
nohupをつけてコマンドを実行する。(コマンドの最後にアンパサンド(&)をつけると、バックグラウンドで起動できる。)
nohup python3 work/jusin-graph_01.py &
 
nohupをつけてコマンドを実行する
 
プロセス番号「5054」が表示される。
 
実行中のプロセスを表示してみる。
ps aux | grep "python"
 
実行中のプロセスを表示
実行した「jusin-graph_01.py」で「5054」と「5055」の2つのプロセスが動いている。
 
パソコンから、Raspberry Pi のIPアドレスにアクセスすると、温度の推移グラフが表示される。(http://192.168.11.112:5001/)
ブラウザに表示された温度の推移グラフ
 
 
以上。
(2024.01.29)
 

 

スポンサー リンク

 

             

 

 

 

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください