actfw

カメラを用いたアプリケーションでは、カメラの撮像とクラス分類の推論を高スループットで行うために 3 つの仕事をタスク並列で実行する必要があります。

  • Capture: カメラの撮像をする
  • Predicator: 分類タスクを行う
  • Presenter: 画面描画を行う

この中に含まれるカメラの撮像や Actcast への Heartbeat などの機能は定形であることが多いです。 これらの機能を Python ライブラリ化したものがactfwです。

actfw の使い方

ライブラリ構成

actfw として次の 2 つが提供されています。

actfwのインストール

actfw-*は pip によりインストールすることが出来ます。 actdk initにより生成されたdependencies.jsonではactfw-raspberrypiがデフォルトでインストールされるようになっています。

actfwの計算モデル

actfwは非同期計算を記述するための Idein 提供ライブラリです。 このライブラリ上のタスクは「このタスクが行われたとき実行する次のタスク」を登録するモデルになっており、データの流れをconnectメソッドで定義します。

actfwのモデル

Applicationクラスが全体をコントロールします。

app = actfw_core.Application()
app.run()

非同期に実行するタスクは以下の 3 つに分けられます。

  • Producer : リソースを生成し続ける処理
  • Pipe : リソースを変形して次のタスクに渡す処理
  • Consumer : リソースを最終的に消費し続ける処理

例えば、次に示す 3 つのタスクがあるとします。

  • カメラから画像を取得(Producer)
    • RGB 画像を撮影し続ける処理
  • 取得画像を白黒にコンバート(Pipe)
    • RGB 画像をグレイスケール画像に変換する処理
  • 画面に描画(Consumer)

これらは以下のように記述できます。

import actfw_core
from actfw_core.task import Pipe, Consumer
from actfw_core.system import find_csi_camera_device
from actfw_core.unicam_isp_capture import UnicamIspCapture
import actfw_raspberrypi
from actfw_raspberrypi.vc4 import Display

class Converter(Pipe):
  ...

class Presenter(Consumer):
  ...

app = actfw_core.Application()

# Capture task
capture_size = (CAPTURE_WIDTH, CAPTURE_HEIGHT)
framerate = 30
device = find_csi_camera_device()
cap = UnicamIspCapture(unicam=device, size=capture_size, framerate=framerate)
app.register_task(cap)

# Converter task
conv = Converter()
app.register_task(conv)

# Presentation task
pres = Presenter(settings, camera, cmd)
app.register_task(pres)

# Make task connection
cap.connect(conv)  # from `cap` to `conv`
conv.connect(pres) # from `conv` to `pres`

app.run()

この例はGetting Started の grayscale exampleを簡略化したものです。

カメラの設定

Raspberry Pi で利用可能なカメラは 2 種類に分けられ、利用可能なインターフェイスが異なります。

Official Raspberry Pi Camera Module

Raspberry Pi が公式に提供しているカメラモジュールです。 actfw_core.unicam_isp_capture.UnicamIspCapture を利用する必要があります。

  • Pi Camera Module 1
  • Pi Camera Module 2
  • Pi Camera Module 3
    • Pi Camera Module 3 は ActcastOS 1,2 ではサポートされていません

一般的な USB カメラ

actfw-core.capture.V4LCameraCapture を利用する必要があります。

TakePhoto 機能の実装

カメラを用いるアプリケーションは設置確認のため、TakePhoto コマンド要求に対応することを推奨します。

actfw では以下のようにCommandServerクラスを用いることで簡単に実装することが出来ます。 CommandServerは内部的に画像を保持し、画像送信要求を受け取ると内部的に保持していた画像を返します。 そのためupdate_imageメソッドで要求を受け取ったときに返す画像をアップデートする必要があります。

class Presenter(Consumer):
    def __init__(self, ..., cmd):
      self.cmd = cmd

    def proc(self, ...):
      ...
      self.cmd.update_image(rgb_image) # update `Take Photo` image
      ...

# Actcast application
app = actfw_core.Application()

# CommandServer (for `Take Photo` command)
cmd = actfw_core.CommandServer()
app.register_task(cmd)

# Presentation task
pres = Presenter(..., cmd)
app.register_task(pres)

アプリケーションの死活監視

アプリケーションが連続的に画像を処理できていないなど何らかの不健全な状態にあると判断されたとき、Actcast デバイスエージェントはアプリケーションを再起動します。 アプリケーションの健全性の判定にはhealthcheckerが利用されます。

以下は、healthcheckerの実装例です。 /root/heartbeatファイルの更新が 1 分以内に起きていることを確認するスクリプトとなっています。

app/healthchecker
#!/bin/bash

HEARTBEAT_FILE='/root/heartbeat'

[ "$(find "${HEARTBEAT_FILE}" -mmin -1)" == "${HEARTBEAT_FILE}" ]

healthcheckerに対応するように、main側ではactfw_core.heartbeat()によって/root/heartbeatファイルを一定間隔で更新します。

class Presenter(Consumer):
  def __init__(self):
    ...

  def proc(self, image):
    # update image here
    actfw_core.heartbeat()

もし、以前のタスクが止まったりすると、この箇所で短い間隔で行われている筈の更新が行われなくなり、healthcheckerに検知されることになります。

画面の描画

一般的に Actcast アプリケーションは高度なセンシングのためのアプリケーションであることがほとんどであるため、画面の描画も必要無いことが多いです。 しかし、デモや動作確認のために画面に計算結果を描画できると便利な場合があります。 actfw-raspberrypiDisplayオブジェクトを用いると HDMI で接続されたディスプレイに RGB 画像を表示することが出来ます。

以下の例は画面に RGB 画像を表示するだけのPresenterタスクの例です。 画面描画を行うだけのPresenterタスクの例です。Presenterは、データを消費だけして次のタスクに渡さないConsumerとして作成しているので,procメソッドに返り値は不要です.

この例では、HDMI ディスプレイを640x480の領域として扱い、 320x240の RGB 画像を(0, 0, 640, 480)の矩形領域に拡大描画しています。

class Presenter(Consumer):
  def __init__(self, camera):
    self.display = Display(camera, (640, 480))

  def proc(self, image):
    self.display.update((0, 0, 640, 480), image.tobytes(), (320, 240), 'rgb')

LocalVideoServer 機能の実装

LocalVideoServer 機能を実装することで、デバイスと同一のネットワーク内からブラウザでライブ映像を確認することができます。 デバイスの設置作業や、設置後の動作確認時に有用なため、実装を推奨します。

詳細は Local Video Server 機能 を参照してください。

actfw が提供する既存の Task では不十分な場合の実装

actfw が提供する既存の Task では実現できない実装を行うには、 actfw-core が提供する Task のサブクラスを継承したクラスを定義して利用する必要があります。この場合、Act の終了処理を妨げない実装にしなければなりません。終了処理 にあるように、Act は SIGTERM シグナルによって終了します。SIGTERM を送信後一定時間アプリケーションが停止しない場合、SIGKILL シグナルによって強制終了されます。したがって SIGKILL による強制終了が発生しないようにすばやく終了するような実装である必要があります。

actfw を利用する場合、 SIGTERMApplication クラスによってハンドルされ、 Application クラスが管理する Task のサブクラスのインスタンスの stop メソッドが順番に呼び出され Act が終了します。

以下では継承するクラスごとに実装方法をまとめます。

Task もしくは Isolated を継承する場合

run メソッドを実装する必要があります。基本的には Task.running 変数を参照しつつループする形になります。なぜなら Task.running 変数は Task.stop メソッドの影響を受けるため ApplicationSIGTERM シグナルをハンドルしたときにループを抜けることが可能になるからです。

このループ内において 10 秒以上ブロックが発生する実装(例: time.sleep(10))をすると終了処理において SIGKILL が発生する場合があるので注意が必要です。例えば、1 時間に一度何かする処理を実装したい場合は、一度に sleep するのではなく、以下のように小刻みに sleep する必要があります(このサンプルコードは簡単のためカウントしていますが、時刻ベースでスケジューリングする方が望ましいです):

class ScheduledTask(Task):
    max_count: int

    def __init__(self, max_count: int) -> None:
        super().__init__()
        self.max_count = max_count

    def run(self) -> None:
        count = 0
        while self.running:
            actfw_core.heartbeat()  # 他の Task で heartbeat するなら不要

            if count >= self.max_count:
                self.do_task()
                count = 0

            count += 1
            time.sleep(1)

    def do_task(self) -> None:
        # やりたい処理
        pass

Producer , Pipe もしくは Consumer を継承する場合

run メソッドにはデフォルト実装があるので代わりに proc メソッドを実装する必要があります。 proc メソッドは run メソッド内のデフォルト実装においてループ内で繰り返し呼び出されるため proc メソッド内で 10 秒以上ブロックが発生する実装をすると終了処理において SIGKILL が発生する場合があるため注意が必要です。

またこれらのクラスのサブクラスにおいて終了時に特別な処理が必要な場合は cleanup メソッドを実装してください。(actfw-core 2.14.0 以上が必要です) cleanup メソッドは run メソッドの最後に呼び出されます。これは run メソッドで例外が発生した場合でも呼び出されるようになっています。 cleanup メソッドの実装において処理に時間がかかるとプロセス全体が SIGKILL されうるため十分すばやく完了する実装である必要があります。

class Counter(Producer):
    def __init__(self) -> None:
        super().__init__()
        self.count = 0

    def cleanup(self) -> None:
        # 十分すばやく完了しなければプロセス全体がSIGKILLで終了されうるので注意
        debug_log(f"Counter.cleanup: {self.count}")

    def proc(self) -> int:
        time.sleep(1)
        self.count += 1
        return self.count

終了処理を正しく実装できているかの判別方法

actdk run -a を実行して Ctrl-C で停止したとき Act の exit code が表示されます。この exit code が 0 であれば終了処理が問題なく実装されていると考えられます。一方で exit code が 137 の場合、SIGKILL によって Act が終了したことを意味します。この場合実装を見直して exit code が 0 になるように修正してください。

正常な場合:

$ actdk run -a
...
[{"current_time": "2025-09-17T16:15:07.258003+09:00"}]
^CSending stop signal to Act and waiting for it to stop...
Press Ctrl+\ to force quit
Act exited status: 0

問題がある場合:

$ actdk run -a
...
[{"current_time": "2025-09-17T16:20:46.361055+09:00"}]
^CSending stop signal to Act and waiting for it to stop...
Press Ctrl+\ to force quit
Act exited status: 137

終了処理が不適切な場合の問題

終了処理が不適切で SIGKILL によって停止させられる Act を運用する場合、Act の停止時に OS 再起動が発生します。これはたとえばファームウェアアップデート前などにおいて本来発生しない OS 再起動を引き起こしデバイスの安定運用やトラブル時の調査の妨げになります。

最終更新日