Loading web-font TeX/Math/Italic

November 17, 2012

Pythonでパーティクルフィルタを実装してみる

パーティクルフィルタ(Particle filter)は,隠れマルコフモデルカルマンフィルタと同じように,システムの観測Yから状態Xを推定する手法.どれもベイジアンフィルタに基づくもので,確率分布p(X_t;Y_{0:t})の表し方が異なる1のですが,パーティクルフィルタでは有限個のサンプリングによって確率分布を近似します.今回は重点サンプリング2を使ったパーティクルフィルタを実装してみます.ほかのフィルタに比べてループぐるぐる回すだけだからすごく簡単!


1. 隠れマルコフモデルはヒストグラム(離散),カルマンフィルタはガウシアン(パラメトリック),パーティクルフィルタはサンプリング(ノンパラメトリック)で表す
2. SciPyには有名ドコロの確率分布からサンプリングする関数が用意されている.任意の確率分布からサンプリングしたい場合には逆関数法,棄却サンプリング,重点サンプリングといった手法などを使う


パーティクルフィルタ


  • たくさんの粒子をばらまいておいて,それっぽく動かして,観測して,各々実際の観測とのズレを測って,正解っぽいっぽい粒子だけ残す,っていうのを繰り返す
  • 入力はN個のパーティクルの状態{\bf x}_t^{(i)},重み{\bf w}_t^{(i)} (i=1,...,N)と制御入力{\bf u}_tと観測{\bf y}_t
  • 出力は更新された状態{\bf x}_{t+1}^{(i)},重み{\bf w}_{t+1}^{(i)}
  • 状態方程式fと観測方程式gが与えられている
    {\bf x}_{t+1} = f({\bf x}_t, {\bf u}_t) + {\bf w} \leftrightarrow p({\bf x}_{t+1}|{\bf x}_t, {\bf u}_t)\\ {\bf y}_t = g({\bf x}_t) + {\bf v} \leftrightarrow p({\bf y}_{t}|{\bf x}_t)
  • 確率分布p({\bf x}_t|{\bf y}_{0:t})をN個の重み付きサンプル\{w_t^{(i)}, {\bf x}_t^{(i)}\}(i=1,...,N)で近似.\deltaはデルタ関数.
    p({\bf x}_{t}|{\bf y}_{0:t}) \approx \sum_{i=1}^{N} w_t^{(i)} \cdot \delta ({\bf x}_t - {\bf x}_t^{(i)})
  • 新しい観測{\bf y}_{t+1}があったら状態推定分布p({\bf x}_{t+1}|{\bf y}_{0:t+1})を3つのステップで更新

    1. 推定
    2. 更新
    3. リサンプリング


例題


2次元座標において、あるロボットがt=0に原点を出発して、速度(4,4)で動くとする。ロボットの進路は風などの影響を受け(\sigma_x=\sigma_y=2),毎秒ごと4つの点(0,0),(10,0),(0,10),(10,10)からの距離を計測できて、計測には距離によらない誤差がある(\sigma_x=\sigma_y=4)とする.このとき観測された軌跡から実際の軌跡を推定する.

Fig. 1 ピヨピヨ



推定


for i in range(N): {\bf x}_{t+1}^{(i)} \sim p({\bf x}_{t+1}^{(i)}|{\bf x}_{t}^{(i)}, {\bf u}_{t})
実際にはN個のパーティクルの位置を状態方程式に代入.
{\bf x}_{t+1}^{(i)} = f({\bf x}_{t}^{(i)}, {\bf u}_{t}) = {\bf A}{\bf x}_{t}^{(i)} + {\bf B}{\bf u}_{t} + {\bf w}
ただし
{\bf A} = \left[ \begin{array}{cc} 1 & 0 \\ 0 & 1 \\ \end{array} \right], {\bf B} = \left[ \begin{array}{cc} 1 & 0 \\ 0 & 1 \\ \end{array} \right], {\bf w} \sim N(0, 2{\bf I})


更新


for i in range(N): w_{t+1}^{(i)} \leftarrow w_{t}^{(i)} \cdot p({\bf y}_{t+1}^{(i)}|{\bf x}_{t+1}^{(i)})
尤度関数によって重みを更新.\sum^i w_{t+1}^{(i)} = 1で正規化.今回はモデルを正規分布にしたのでRBFカーネルで.尤度関数は推定値と観測値が似てれば似てるほど大きくなるように設定.物体追跡検知とかだと色の情報を使う.
p({\bf y}_{t+1}^{(i)}|{\bf x}_{t+1}^{(i)}) \propto \exp(-\frac{(y-g(x))^2}{\sigma^2})
ただし
g({\bf x}) = \left[ ||{\bf x}-{\bf p}_1||, ||{\bf x}-{\bf p}_2||, ||{\bf x}-{\bf p}_3||, ||{\bf x}-{\bf p}_4|| \right]^{\mathsf{T}}\\ {\bf p}_1=\left[0, 0\right]^{\mathsf{T}}, {\bf p}_2=\left[10, 0\right]^{\mathsf{T}}, {\bf p}_3=\left[0, 10\right]^{\mathsf{T}}, {\bf p}_4=\left[10, 10\right]^{\mathsf{T}} \\ \sigma^2 = 4


リサンプリング


\{ {\bf x}_{t+1}^{(i)}, w_{t+1}^{(i)}=1 \} \leftarrow resampling(\{ {\bf x}_{t+1}^{(i)}, w_{t+1}^{(i)} \})
重みに応じてリサンプリング.重みが偏らないように.毎回やる必要はない.色々手法があるらしいけど今回は単純に多項分布でサンプリング.


結果



Fig. 2 10ステップ分.が実際の軌跡.がパーティクル.がパーティクル平均.線形システムだから全然パーティクルフィルタ使う意味なかったけど…




実装


要NumPy

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Particle Filter
"""
import numpy
def f(x, u):
"""Transition model
- 状態方程式
f(x, u) = A * x + B * u + w
w ~ N(0, 2I)
- AとBは単位ベクトル
"""
# noise sigma=2
w_mean = numpy.zeros(2)
w_cov = numpy.eye(2) * 2.
w = numpy.random.multivariate_normal(w_mean, w_cov)
x_transition = numpy.dot(numpy.diag([1., 1.]), x) + numpy.dot(numpy.diag([1., 1.]), u) + w
return x_transition
def g(x):
"""Obersvation model
- 観測方程式
g(x) = [|x-p1|, |x-p2|, |x-p3|, |x-p4|].T + v
v ~ N(0, 4I)
- ある4点からの距離
"""
# observation points
p1 = [0., 0.]; p2 = [10., 0.]; p3 = [0., 10.]; p4 = [10., 10.]
# noise sigma=4
v_mean = numpy.zeros(4)
v_cov = numpy.eye(4) * 4.
v = numpy.random.multivariate_normal(v_mean, v_cov)
# observation vector
y = numpy.array([numpy.linalg.norm(x - p1),
numpy.linalg.norm(x - p2),
numpy.linalg.norm(x - p3),
numpy.linalg.norm(x - p4)]) + v
return y
def liklihood(y, x):
"""Liklihood function
- 尤度関数
p(y|x) ~ exp(|y-g_(x)|**2 / simga**2)
- g_は誤差なしの観測方程式とする
- v ~ N(0, 4I)としたのでsigma**2=4
- 尤度 = 推定値と観測値との類似度
- アプリケーションによって適切に決めないといけない
- 物体追跡だと色情報を使ったりするかも
"""
p1 = [0., 0.]; p2 = [10., 0.]; p3 = [0., 10.]; p4 = [10., 10.]
g_ = lambda x: numpy.array([numpy.linalg.norm(x - p1), numpy.linalg.norm(x - p2), numpy.linalg.norm(x - p3), numpy.linalg.norm(x - p4)])
return numpy.exp(-numpy.dot(y - g_(x), y - g_(x)) / 4)
def state_transition(x, u):
"""State transition
- 状態方程式に現在のxを代入して未来のxを計算
x_predicted = f(x, u)
"""
x_predicted = f(x, u)
return x_predicted
def importance_sampling(weight, x_predicted, y):
"""Importat Sampling
- 実際の観測yに対して観測方程式でxから推定したyがどれくらいずれているか計算(尤度関数)
p(y|x_predicted) <- g(x_predicted)
- 上の計算結果に応じて重みを更新
weight_updated = weight * p(y|x_predicted)
"""
weight_updated = weight * liklihood(y, x_predicted)
return weight_updated
def resampling(X, W):
"""Resampling
- 一部のサンプルに重みが集中するを防ぐために重みに応じて子孫を生成
"""
X_resampled = []
samples = numpy.random.multinomial(len(X), W)
for i, n in enumerate(samples):
for j in range(n):
X_resampled.append(X[i])
W_resampled = [1.] * len(X)
return X_resampled, W_resampled
def pf_sir_step(X, W, u, y):
"""One Step of Sampling Importance Resampling for Particle Filter
Parameters
----------
X : array of [float|array]
状態 List of state set
W : array of float
重み List of weight
u : float or array
制御入力 Control input
y : float or array
観測 Observation set
Returns
-------
X_resampled : array of [float|array]
次の状態 List updated state
W_resampled : array of float
次の重み List updated weight
"""
# パーティクルの数 num of particles
N = len(X)
# 初期化
X_predicted = range(N)
W_updated = range(N)
# 推定 prediction
for i in range(N):
X_predicted[i] = state_transition(X[i], u)
# 更新 update
for i in range(N):
W_updated[i] = importance_sampling(W[i], X_predicted[i], y)
# 正規化 normalization
w_updated_sum = sum(W_updated) # 総和 the sum of weights
for i in range(N):
W_updated[i] /= w_updated_sum
# リサンプリング re-sampling (if necessary)
X_resampled, W_resampled = resampling(X_predicted, W_updated)
return X_resampled, W_resampled
def main():
# 条件
T = 10 # ステップ数
N = 100 # パーティクル数
u = [4., 4.] # 制御入力
# 実際の軌跡と観測値
actuals = []
Y = []
x = [0., 0.]
for i in range(T):
x = f(x, u)
y = g(x)
actuals.append(x)
Y.append(y)
# 初期条件
X = [] # 初期パーティクル
W = [] # 初期重み
for i in range(N):
X.append(numpy.random.randn(2) * 20)
W.append(1.)
# パーティクルの位置と推定値の保存用
particles = [X]
predictions = [numpy.mean(X, axis=0)]
# パーティクルフィルタ
for i in range(T):
y = Y[i]
X, W = pf_sir_step(X, W, u, y)
particles.append(X)
predictions.append(numpy.mean(X, axis=0))
# JSON形式で保存
import json
d = {}
d["actuals"] = [x.tolist() for x in actuals]
d["particles"] = [[x.tolist() for x in particle] for particle in particles]
d["predictions"] = [x.tolist() for x in predictions]
print >> open("pf.json", "w"), json.dumps(d)
if __name__ == '__main__':
main()
view raw pf.py hosted with ❤ by GitHub

No comments:

Post a Comment