MobileNet を使ったアプリの作成例

アプリのパフォーマンス調査については actdk upload 後に Actcast からインストールして確認してください。

ActDK を用いたアプリケーション実行では、深層学習モデルは高速化されません。

Example: MobileNet を使ったアプリの作成 #

ここでは MobileNet 1000 クラス分類の訓練済みモデルを例に、Actcast でのアプリケーションの開発方法を説明します。

開発するアプリケーション #

今回作成するのは ImageNet で提供される 1000 クラスの分類を行い、どれか 90%以上の精度で識別できたとき Actcast を通して通知を行うアプリケーションです。 画像を公式カメラモジュールから撮影し、分類し、画面に描画します。

アプリケーションのイメージ

可能な設定項目 #

エッジコンピューティングの用途を考えると、分類タスクの処理結果はどこかネットワーク経由で通知されれば十分ですが、動作確認やデモのためにディスプレイに描画できると便利です。 そのためディスプレイ描画を行うかどうか on/off を切り替えられるようにします。

また用途により検出の精度に対する考え方が異なるため、しきい値を外から変更可能にします。

  • display: display 出力の有無
  • threshold: 検出イベントのしきい値

分類の結果を確認するまでの機能が提供範囲で実装可能です。

Actcast アプリケーションの概略 #

Actcast アプリケーションは通常の Linux アプリケーションです。 ネットワーク機能など一部の OS の機能は制限されており、IoT 機器向け機能としての死活管理・設置確認のために以下の項目の実装が求められます。

  • Heartbeat 機能
  • TakePhoto 機能

アプリケーションは柔軟な運用のため、通常設定項目を持ちます。 例えば、今回のアプリケーションでは「ディスプレイ描画を on にする」「通知のしきい値は 90%以上である」「猫だけ検出をする」などの設定が出来るようにしたいと思うでしょう。 Actcast でもアプリケーションには設定項目をもたせることができます。

開発者はアプリケーションスキーマを定義する必要があります。

アプリケーションの構成 #

Actcast アプリケーションでは app ディレクトリに、以下の 2 つのファイルを必ず作成する必要があります。

  • main
  • healthchecker

mainはアプリケーションのエントリポイントとなるプログラムです。 実行可能などのような形式でも実装可能です。 今回は Python で実装します。

healthcheckerはアプリの正常な動作を監視するためのスクリプトです。 Docker の実行の仕組みを利用しています。

これら 2 つのファイルを作成すると、以下のようなディレクトリ構造になります。

application-root
└── app
   ├── main
   └── healthchecker

プロジェクトの初期化 #

Actcast サービス上でアプリケーションを作成した後、以下のコマンドで Actcast SDK 用のプロジェクトを初期化します。

$ actdk init -i $APP_ID -s "A 1000-class object classfier" object_classifier
  • object_classifier
    • 任意の名称
  • -i オプション
    • Actcast Web UI で発行されたアプリケーション ID。$APP_ID は発行されたアプリケーション ID (例: 48) に置き換えてください。
  • -s オプション
    • 任意の 1 行説明

モデルから共有ライブラリを生成 #

actcast-app-examples リポジトリ で公開されている ImageNet Classification アプリ内のモデルを利用します。

model.nnoir ファイルを model ディレクトリに配置し、actdk compile で共有ライブラリを生成します。

$ wget -O model/model.nnoir https://github.com/Idein/actcast-app-examples/raw/master/imagenet-classification-for-raspi/model/model.nnoir
$ actdk compile
release: Pulling from idein/nnoir2stubc
...(snipped)...
compile finished

これにより以下のファイルが生成されます。

$ find . -name "*model.*"
./model/model.nnoir
./app/libmodel.so
./app/model.py
./include/model.h

libmodel.soは nnoir ファイルで表現される深層学習モデルを実行するための共有ライブラリです。 そのインタフェースとしてmodel.hmodel.pyが生成されます。 今回は Python で実装するためmodel.hは利用しませんが、model.hを使えば C 言語などでも開発できます。 libmodel.so の制限事項など詳細はコンパイラの仕様を参照して下さい。

model.py #

model.pyは Python からlibmodel.soを利用するためのインタフェースを提供します。 この例ではMobileNet_v2が深層学習モデルの推論処理を実行するエントリポイントです。 入出力は numpy の配列で、形状は nnoir 生成時の多次元配列形状が保持されます。

この場合、入力input0は numpy の配列で形状は(1, 224, 224, 3)です。出力の配列の形状は(1, 1001)です。

from ctypes import cdll
import numpy as np


class Model:

    def __init__(self, sopath='/root/libmodel.so'):
        lib = cdll.LoadLibrary(sopath)

        self.run_MobileNet_v2 = lib.MobileNet_v2_with_zerocopy
        self.run_MobileNet_v2.argtypes = [
            np.ctypeslib.ndpointer(dtype=np.float32, shape=(1, 224, 224, 3, ), flags="C_CONTIGUOUS"),
            np.ctypeslib.ndpointer(dtype=np.float32, shape=(1, 1001, ), flags="C_CONTIGUOUS"),
        ]

    def MobileNet_v2(self, input0):
        output0 = np.empty((1, 1001, ), dtype=np.float32)
        self.run_MobileNet_v2(input0, output0)
        return output0,

アプリケーション設定 #

今回はsetting_schema.jsondata_schema.jsonを、それぞれ次のように記述します。 Manifesto はdefault.jsonをそのまま使用します。

setting_schema.json #

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "type": "object",
    "properties": {
        "display": {
            "title": "display",
            "description": "output video to HDMI display",
            "type": "boolean",
            "default": false
        },
        "threshold": {
            "title": "probability threshold",
            "description": "notify when over this threshold",
            "type": "number",
            "default": 0.9,
            "minimum": 0,
            "maximum": 1
        }
    },
    "required": [
        "display",
        "threshold"
    ],
    "propertyOrder": [
        "display",
        "threshold"
    ]
}

data_schema.json #

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "array",
  "items": {
    "type": "object",
    "properties": {
      "prob": {
        "title": "probability",
        "description": "matching score",
        "type": "number",
        "minimum": 0.0,
        "maximum": 1.0
      },
      "label": {
        "title": "label",
        "description": "the most matching class name",
        "type": "string"
      }
    },
    "required": ["prob", "label"]
  },
  "propertyOrder": []
}

mainファイル #

前項で生成した共有ライブラリを用いてアプリケーションコードを記述します。 このアプリの構成は次のようになります。

アプリの構成

#!/usr/bin/python3
import argparse
from PIL import Image, ImageDraw, ImageFont
import actfw_core
from actfw_core.task import Pipe, Consumer
from actfw_core.capture import V4LCameraCapture
import actfw_raspberrypi
from actfw_raspberrypi.vc4 import Display
import numpy as np
from model import Model

(CAPTURE_WIDTH, CAPTURE_HEIGHT) = (224, 224)  # capture image size
(DISPLAY_WIDTH, DISPLAY_HEIGHT) = (640, 480)  # display area size


class Classifier(Pipe):

    def __init__(self, capture_size):
        super(Classifier, self).__init__()
        self.model = Model()
        self.capture_size = capture_size

    def proc(self, frame):
        rgb_image = Image.frombuffer('RGB', self.capture_size, frame.getvalue(), 'raw', 'RGB')
        rgb_image = rgb_image.resize((CAPTURE_WIDTH, CAPTURE_HEIGHT))
        input_image = np.asarray(rgb_image).reshape(1, CAPTURE_WIDTH, CAPTURE_HEIGHT, 3).astype(np.float32)
        probs, = self.model.MobileNet_v2(input_image)
        return (rgb_image, probs[0][1:])


class Presenter(Consumer):

    def __init__(self, settings, preview_window, cmd):
        super(Presenter, self).__init__()
        self.settings = settings
        self.preview_window = preview_window
        self.cmd = cmd
        self.font = ImageFont.truetype(font='/usr/share/fonts/truetype/dejavu/DejaVuSansMono-Bold.ttf', size=18)
        with open('labels.txt') as f:
            self.labels = f.read().splitlines()

    def proc(self, images):
        rgb_image, probs = images
        top1 = probs.argsort()[-1]
        if probs[top1] > self.settings['threshold']:
            actfw_core.notify([{'prob': float(probs[top1]), 'label': self.labels[top1]}])
        self.cmd.update_image(rgb_image)  # update `Take Photo` image
        actfw_core.heartbeat()
        if self.preview_window is not None:
            draw = ImageDraw.Draw(rgb_image)
            draw.text((0, 0), "{:>6.2f}% {}".format(100 * probs[top1], self.labels[top1]), font=self.font, fill=(0, 255, 0))
            self.preview_window.blit(rgb_image.tobytes())
            self.preview_window.update()


def main(args):

    # Actcast application
    app = actfw_core.Application()

    # Load act setting
    settings = app.get_settings({'display': True, 'threshold': 0.8})

    # CommandServer (for `Take Photo` command)
    cmd = actfw_core.CommandServer()
    app.register_task(cmd)

    # Capture task
    cap = V4LCameraCapture('/dev/video0', (CAPTURE_WIDTH, CAPTURE_HEIGHT), 15, format_selector=V4LCameraCapture.FormatSelector.PROPER)
    capture_size = cap.capture_size()
    app.register_task(cap)

    # Classifier task
    conv = Classifier(capture_size)
    app.register_task(conv)

    def run(preview_window=None):

        # Presentation task
        pres = Presenter(settings, preview_window, cmd)
        app.register_task(pres)

        # Make task connection
        cap.connect(conv)  # from `cap` to `conv`
        conv.connect(pres)  # from `conv` to `pres`

        # Start application
        app.run()

    if settings['display']:
        with Display() as display:
            display_width, display_height = display.size()
            scale = min(float(display_width / CAPTURE_WIDTH), float(display_height / CAPTURE_WIDTH))
            width = int(scale * CAPTURE_WIDTH)
            height = int(scale * CAPTURE_HEIGHT)
            left = (display_width - width) // 2
            upper = (display_height - height) // 2
            with display.open_window((left, upper, width, height), (CAPTURE_WIDTH, CAPTURE_HEIGHT), 1000) as preview_window:
                run(preview_window)
    else:
        run()


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='example: 1000 class classification')
    main(parser.parse_args())

分類器クラスの実装 #

1000 クラス分類を行うClassifierクラスはPipeオブジェクトを継承して作成します。

class Classifier(Pipe):

    def __init__(self, capture_size):
        super(Classifier, self).__init__()
        self.model = Model()
        self.capture_size = capture_size

    def proc(self, frame):
        rgb_image = Image.frombuffer('RGB', self.capture_size, frame.getvalue(), 'raw', 'RGB')
        rgb_image = rgb_image.resize((CAPTURE_WIDTH, CAPTURE_HEIGHT))
        input_image = np.asarray(rgb_image).reshape(1, CAPTURE_WIDTH, CAPTURE_HEIGHT, 3).astype(np.float32)
        probs, = self.model.MobileNet_v2(input_image)
        return (rgb_image, probs[0][1:])

Classifierでは、実際に分類タスクを行うModel.Mobilenet_v2メソッドに与える引数の型・配列の形状などのインターフェイスを揃え、分類タスクを呼び出します。 actfw_core.capture.V4LCameraCaptureactfw_core.Frameオブジェクトをキューに挿入するため、以下のようなインターフェイスを合わせる処理をしています。

  • getvalue()で RGB 画像のバイト列を取得
  • 後々、扱い易いようPIL.Image.frombufferImageオブジェクトを作成
  • MobileNet_v2 の入力に合うよう、numpy の配列に変換、reshape、float への型変換を行う

画面描画部分の実装 #

画面描画を担当するタスクPresenterは画面描画だけではなく、以下の処理を行っています。

  • CommandServer用画像更新: CommandServerオブジェクトを初期化時に設定し、proc内でそのupdate_imageを呼び出す
  • アプリケーション死活監視: proc内でactfw_core.heartbeat()を呼び出す

actfw_core.notifyは Actcast に送信する Act Log の内容を記述します。 この場合はdata_schema.jsonの書式に従い、分類結果の確率とラベルを送信しています。

class Presenter(Consumer):

    def __init__(self, settings, preview_window, cmd):
        super(Presenter, self).__init__()
        self.settings = settings
        self.preview_window = preview_window
        self.cmd = cmd
        self.font = ImageFont.truetype(font='/usr/share/fonts/truetype/dejavu/DejaVuSansMono-Bold.ttf', size=18)
        with open('labels.txt') as f:
            self.labels = f.read().splitlines()

    def proc(self, images):
        rgb_image, probs = images
        top1 = probs.argsort()[-1]
        if probs[top1] > self.settings['threshold']:
            actfw_core.notify([{'prob': float(probs[top1]), 'label': self.labels[top1]}])
        self.cmd.update_image(rgb_image)  # update `Take Photo` image
        actfw_core.heartbeat()
        if self.preview_window is not None:
            draw = ImageDraw.Draw(rgb_image)
            draw.text((0, 0), "{:>6.2f}% {}".format(100 * probs[top1], self.labels[top1]), font=self.font, fill=(0, 255, 0))
            self.preview_window.blit(rgb_image.tobytes())
            self.preview_window.update()

main #

プログラムの実行のエントリポイントの実装は、図:アプリの構成をご覧ください。

def main(args):

    # Actcast application
    app = actfw_core.Application()

    # Load act setting
    settings = app.get_settings({'display': True, 'threshold': 0.8})

    # CommandServer (for `Take Photo` command)
    cmd = actfw_core.CommandServer()
    app.register_task(cmd)

    # Capture task
    cap = V4LCameraCapture('/dev/video0', (CAPTURE_WIDTH, CAPTURE_HEIGHT), 15, format_selector=V4LCameraCapture.FormatSelector.PROPER)
    capture_size = cap.capture_size()
    app.register_task(cap)

    # Classifier task
    conv = Classifier(capture_size)
    app.register_task(conv)

    def run(preview_window=None):

        # Presentation task
        pres = Presenter(settings, preview_window, cmd)
        app.register_task(pres)

        # Make task connection
        cap.connect(conv)  # from `cap` to `conv`
        conv.connect(pres)  # from `conv` to `pres`

        # Start application
        app.run()

    if settings['display']:
        with Display() as display:
            display_width, display_height = display.size()
            scale = min(float(display_width / CAPTURE_WIDTH), float(display_height / CAPTURE_WIDTH))
            width = int(scale * CAPTURE_WIDTH)
            height = int(scale * CAPTURE_HEIGHT)
            left = (display_width - width) // 2
            upper = (display_height - height) // 2
            with display.open_window((left, upper, width, height), (CAPTURE_WIDTH, CAPTURE_HEIGHT), 1000) as preview_window:
                run(preview_window)
    else:
        run()


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='example: 1000 class classification')
    main(parser.parse_args())

ラベルリスト #

こちら のファイルを app/labels.txt という名前で保存します。 このファイルの各行は synset のラベルに対応しています。

tench, Tinca tinca
goldfish, Carassius auratus
great white shark, white shark, man-eater, man-eating shark, Carcharodon carcharias
tiger shark, Galeocerdo cuvieri
hammerhead, hammerhead shark
electric ray, crampfish, numbfish, torpedo
.
.
.

依存パッケージ #

.actdk/dependencies.json に、このアプリケーションが依存するパッケージを記載します。

{
  "apt": [
    "libv4l-0",
    "libv4lconvert0",
    "python3-pil",
    "python3-numpy",
    "fonts-dejavu-core"
  ],
  "pip": [],
  "raspberrypi-buster": {
    "apt": [
      "libraspberrypi0"
    ],
    "pip": [
      "actfw-raspberrypi"
    ]
  }
}

healthchecker #

app/healthchecker を作成します。 アプリケーションの実装 を参考に実装してください。

アプリケーションのビルド #

次のコマンドを実行することにより、Actcast アプリケーションをビルドします。

(開発版)

$ actdk build <IDENTIFIER_YOU_LIKE>

(リリース版)

$ actdk build --release

動作確認 #

リリース版の場合、上記の Actcast アプリケーションのビルドが終了している状態で、以下のコマンドによりアプリケーションを Raspberry Pi に転送します。

$ actdk deploy <IDENTIFIER_YOU_LIKE>

リリース版の転送が完了している、あるいは開発版でビルドしている場合は、以下のコマンドにより Raspberry Pi 上でアプリケーションを実行できます。

(開発版)

$ actdk run <IDENTIFIER_YOU_LIKE> -a

(リリース版)

$ actdk run <IDENTIFIER_YOU_LIKE> -a --release

このときアプリに渡す設定を用意する必要があります。設定はカレントディレクトリのact_settings.jsonというファイルに書きます。 このファイルは、actdk runの際に Raspberry Pi に転送されます。

以下の json はact_settings.jsonの例です。ディスプレイ表示を on に、しきい値を 0.7 に指定しています。

{
  "display": true,
  "threshold": 0.7
}

act_settings.jsonactdk generate act_settings コマンドで生成することもできます。

act_settings.jsonは JSON スキーマファイルsetting_schema.jsonに従う必要があり、適合しているかはactdk run実行時に検査されます。 検査をパスしないと Raspberry Pi での実行は行えません。もしくは、下記のコマンドでact_settings.json及び各スキーマファイルの検査だけを行えます。

$ actdk validate-json

このアプリではカメラを利用するため、テスト用の Raspberry Pi に正しく USB カメラか RaspberryPi カメラモジュール が接続されている必要があります。

-aオプションがついているとき、Actcast アプリケーションの標準出力は作業用コンピュータに転送されます。 -aオプションを省略した場合、Actcast アプリケーションの実行の状態にかかわらずコマンド実行は終了します。

アプリケーションを終了させるためには以下のコマンドを実行します。

$ actdk stop <IDENTIFIER_YOU_LIKE>

-aを付けていた場合、C-c で中断すると Raspberry Pi 上でのアプリケーションは終了します。

Actcast でのオンラインビルド後の動作確認 #

手元の Raspberry Pi 上で動作確認が取れたら、オンラインアップロードを行い Actcast を経由して アプリを利用可能にします。詳細は Actcast へのアップロードとテスト をご覧ください。

アップロードしたビルドイメージは Actcast にて再ビルドされ、libmodel.soが高速なものに差し替えられます。 このオンラインビルドが終了したアプリケーションは、通常の Actcast アプリケーションと同様に Web UI から Act として手元の Raspberry Pi にインストールし、動かすことが可能です。

この際、Raspberry Pi に挿入する SD カードは通常の Actcast クライアント用イメージを書き込んだものを使います。 オフラインでの動作確認で使用した確認用イメージとは別であることに注意してください。


アプリケーション開発 に戻る