状態方程式と観測方程式が
\begin{align} {\bf x}_{t+1}&=A{\bf x}_t + {\bf w}_t\\ {\bf y}_{t}&=C{\bf x}_t + {\bf v}_t\\ {\bf w}_t &\sim N(0,Q)\\ {\bf v}_t &\sim N(0,R)\\ \end{align}というように表され,\{{\bf y}_1,...,{\bf y}_T\}が与えられたときに,A,C,Q,Rと初期状態の平均ベクトル\pi_1と分散共分散行列V_1を推定します.
論文がとても分かりやすいので,実装に必要なところだけ抜粋します.
Eステップ
以下のように定義します.
\begin{align} {\bf x}_t^{\tau}&=E({\bf x}_t|\{{\bf y}_1^{\tau}\})\\ V_t^{\tau}&=Var({\bf x}_t|\{{\bf y}_1^{\tau}\})\\ \end{align}
Kalman filter
\begin{align} {\bf x}_1^0 &= {\bf \pi}_1\\ V_1^0 &= V_1\\ {\bf x}_t^{t-1} &= A {\bf x}_{t-1}^{t-1}\\ V_t^{t-1} &= A V_{t-1}^{t-1} A' + Q\\ K_t &= V_t^{t-1}C'(CV_t^{t-1}C'+R)^{-1}\\ {\bf x}_t^t &= {\bf x}_t^{t-1} + K_t({\bf y}_t-C{\bf x}_t^{t-1})\\ V_t^t &= V_t^{t-1} - K_t C V_t^{t-1}\\ \end{align}
Kalman smoother
\begin{align} J_{t-1} &= V_{t-1}^{t-1}A'(V_t^{t-1})^{-1}\\ {\bf x}_{t-1}^T &= {\bf x}_{t-1}^{t-1} + J_{t-1}({\bf x}_t^T - A{\bf x}_{t-1}^{t-1})\\ V_{t-1}^T &= V_{t-1}^{t-1} + J_{t-1}(V_t^T - V_t^{t-1})J_{t-1}'\\ V_{T,T-1}^T &= (I - K_T C)AV_{T-1}^{T-1}\\ V_{t-1,t-2}^T &= V_{t-1}^{t-1}J_{t-2}' + J_t-1(V_{t,t-1}^T - AV_{t-1}^{t-1})J_{t-2}'\\ \end{align}
これらを用いて,Mステップへの入力を求めます.
\begin{align} {\hat {\bf x}}_t & = {\bf x}_t^T \\ P_t & = V_t^T + {\bf x}_t^T {{\bf x}_t^T}' \\ P_{t,t-1} & = V_{t,t-1}^T + {\bf x}_t^T{{\bf x}_{t-1}^T}' \\ \end{align}
Mステップ
\begin{align} C^{new} & = (\sum_{t=1}^T {\bf y}_t {\hat{\bf x}}_t')(\sum_{t=1}^T P_t)^{-1} \\ R^{new} & = \frac{1}{T}\sum_{t=1}^T({\bf y}_t{\bf y}_t' - C^{new}{\hat{\bf x}}_t{\bf y}_t') \\ A^{new} & = (\sum_{t=2}^T P_{t,t-1})(\sum_{t=2}^T P_{t-1})^{-1} \\ Q^{new} & = \frac{1}{T-1}(\sum_{t=2}^T P_{t} - A^{new}\sum_{t=2}^T P_{t-1,t}) \\ {\bf \pi}_1^{new} & = {\hat {\bf x}}_1 \\ V_1^{new} & = P_1 - {\hat {\bf x}}_1{\hat {\bf x}}_1' \end{align}なおN個の観測列があるときは
\begin{align} {\bar {\hat {\bf x}}}_t & = \frac{1}{N}\sum_{i=1}^N {\hat {\bf x}}_t^{(i)} \\ V_1^{new} & = P_1 - {\bar {\hat {\bf x}}}_1{\bar {\hat {\bf x}}}_1' + \frac{1}{N}\sum_{i=1}^N [{\bar {\hat {\bf x}}}_1^{(i)}-{\bar {\hat {\bf x}}}_1][{\bar {\hat {\bf x}}}_1^{(i)}-{\bar {\hat {\bf x}}}_1]' \\ \end{align}
実験
Wikipediaのカルマンフィルタの例にあるシステムで実験してみました.トロッコが摩擦なしの無限レール上でランダムな力によって動くシステムです.
データを生成するために用意したパラメータは
A [[ 1. 0.1] [ 0. 1. ]] C [[ 1. 0.]] Q [[ 2.50000000e-05 5.00000000e-04] [ 5.00000000e-04 1.00000000e-02]] R [[ 1.]]でした.
が,下のコードによって推定された結果は
A [[ 0.62656567 0.64773696] [ 0.56148647 0.0486408 ]] C [[ 0.19605285 1.14560846]] Q [[ 0.15479875 -0.12660499] [-0.12665761 0.31297647]] R [[ 0.59216228]] pi_1 [[-0.39133729] [-0.89786661]] V_1 [[ 0.18260503 -0.07688508] [-0.07691364 0.20206351]]でした.下のコードでは生成されるデータは毎回異なるので結果はその都度変わります.がどうもうまくいかない.
赤が真の位置,マゼンダが推定された位置.青が真の速度,シアンが推定された速度.
どこがおかしいのだろう(:D)| ̄|_ =3
1. Z. Ghahramani and G.E. Hinton. Parameter estimation for linear dynamical systems. University of Toronto technical report CRG-TR-96-2, 6, 1996.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
''' | |
Parameter Estimation for Linear Dynamical Systems | |
x[t] = A*x[t-1] + w[t] | |
y[t] = C*x[t] + v[t] | |
w[t] ~ N(0,Q) | |
v[t] ~ N(0,R) | |
''' | |
import numpy as np | |
import matplotlib.pyplot as plt | |
def E_step(y, A, C, Q, R, pi_1, V_1): | |
# num of observations | |
T = len(y) | |
# prediction | |
x_p = {} # x_p[t-1] := x_t^{t-1} | |
V_p = {} # V_p[t-1] := V_t^{t-1} | |
K = {} # K[t-1] := K_t | |
# filter | |
x_f = {} # x_f[t-1] := x_t^t | |
V_f = {} # V_f[t-1] := V_t^t | |
# smoother | |
J = {} # J[t-1] := J_{t} | |
x_s = {} # x_s[t-1] := x_{t}^T | |
V_s = {} # V_s[t-1] := V_{t}^T | |
V_a = {} # V_a[t-1] := V_{t,t-1}^T | |
# response | |
x = {} # x[t-1] := {¥hat x}_t | |
P = {} # P[t-1] := P_t | |
P_a = {} # P_a[t-1] := P_{t,t-1} | |
# kalman filter | |
for t in range(T): | |
if t == 0: # initialize | |
x_p[t] = pi_1 | |
V_p[t] = V_1 | |
else: | |
x_p[t] = A * x_f[t-1] | |
V_p[t] = A * V_f[t-1] * A.T + Q | |
K[t] = V_p[t] * C.T * np.linalg.pinv((C * V_p[t] * C.T + R)) | |
x_f[t] = x_p[t] + K[t] * (y[t] - C * x_p[t]) | |
V_f[t] = V_p[t] - K[t] * C * V_p[t] | |
# kalman smoother | |
x_s[T-1] = x_f[T-1] | |
V_s[T-1] = V_f[T-1] | |
for t in range(T-1, 0, -1): | |
J[t-1] = V_f[t-1] * A.T * np.linalg.pinv(V_p[t]) | |
x_s[t-1] = x_f[t-1] + J[t-1] * (x_s[t] - A * x_f[t-1]) | |
V_s[t-1] = V_f[t-1] + J[t-1] * (V_s[t] - V_p[t]) * J[t-1].T | |
I = np.mat(np.eye(*A.shape)) | |
V_a[T-1] = (I - K[T-1] * C) * A * V_f[T-2] | |
for t in range(T-1, 1, -1): | |
V_a[t-1] = V_f[t-1] * J[t-2].T + J[t-1] * (V_a[t] - A * V_f[t-1]) * J[t-2].T | |
# set response | |
for t in range(T): | |
x[t] = x_s[t] | |
P[t] = V_s[t] + x_s[t] * x_s[t].T | |
if t == 0: continue | |
P_a[t] = V_a[t] + x_s[t] * x_s[t-1].T | |
return x, P, P_a | |
def M_step(y, x, P, P_a): | |
# num of observations | |
T = len(y) | |
# Output matrix | |
C = sum([y[t]*x[t].T for t in range(T)]) * np.linalg.pinv(sum([P[t] for t in range(T)])) | |
# Output noise covariance | |
R = sum([y[t]*y[t].T - C * x[t] * y[t].T for t in range(T)]) / T | |
# State dynamics matrix | |
A = sum([P_a[t] for t in range(1,T)]) * np.linalg.pinv(sum([ P[t-1] for t in range(1,T)])) | |
# State noise covariance | |
Q = (sum([P[t] for t in range(1,T)]) - A * sum([P_a[t] for t in range(1,T)])) / (T - 1) | |
#Q = sum( [ (P[t] - A * P_a[t]) for t in range(1,T) ] ) / (T-1) | |
# Initail state mean | |
pi_1 = x[1] | |
# Initial state covariance | |
V_1 = P[1] - x[1] * x[1].T | |
return A, C, Q, R, pi_1, V_1 | |
if __name__ == '__main__': | |
# テストデータ生成 | |
dt = 0.1 | |
x = np.mat([[0.0], | |
[0.0]]) | |
A = np.mat([[1.0,dt], | |
[0.0,1.0]]) | |
C = np.mat([[1.0,0]]) | |
Q = np.mat([[dt**4/4,dt**3/2], | |
[dt**3/2,dt**2]]) | |
R = np.mat([[1.0]]) | |
print "A\n%s\nC\n%s\nQ\n%s\nR\n%s" % (A, C, Q, R) | |
X = [] # 状態 | |
Y = [] # 観測 | |
K = 500 # サンプル数 | |
for i in range(K): | |
x = A * x + np.mat(np.random.multivariate_normal((0,0),Q)).T | |
y = C * x + np.mat(np.random.normal(0,1)).T | |
X.append(x) | |
Y.append(y) | |
# 推定 | |
# 初期値をランダムに振る | |
A = np.mat(np.random.rand(2,2)) | |
C = np.mat(np.random.rand(1,2)) | |
Q = np.mat(np.random.rand(2,2)) | |
Q = (Q + Q.T) / 2 | |
R = np.mat(np.random.rand(1,1)) | |
pi_1 = np.mat(np.random.rand(2,1)) | |
V_1 = np.mat(np.random.rand(2,2)) | |
V_1 = (V_1 + V_1.T) / 2 | |
N = 100 # EM回数 | |
e = E_step(Y, A, C, Q, R, pi_1, V_1) | |
for i in range(100): | |
print i | |
m = M_step(Y, *e) | |
e = E_step(Y, *m) | |
# 結果表示 | |
print "A\n%s\nC\n%s\nQ\n%s\nR\n%s\npi_1\n%s\nV_1\n%s" % m | |
x_hat, pi, pa = e | |
# テストデータ | |
X1 = [] | |
X2 = [] | |
# 推定結果 | |
X3 = [] | |
X4 = [] | |
for x in X: | |
X1.append(x[0,0]) | |
X2.append(x[1,0]) | |
for i in x_hat: | |
X3.append(x_hat[i][0,0]) | |
X4.append(x_hat[i][1,0]) | |
plt.plot(X1, 'r-') | |
plt.plot(X2, 'b-') | |
plt.plot(X3, 'm-') | |
plt.plot(X4, 'c-') | |
plt.show() |