Loading web-font TeX/Main/Bold

February 17, 2012

Pythonで線形動的システムのパラメータ推定を実装してみるもなんかうまくいってない

線形動的システム(線形力学系,Linear dynamical systems; LDS)のEMアルゴリズムによるパラメータ推定を行いました.が,うまくいってない気がします.カルマンフィルタなどで扱う状態方程式(外部入力なし)と観測方程式において,状態と観測が与えられたときに,係数を求めるのが目的です.Ghahramaniらの論文1の手法を実装します.

状態方程式と観測方程式が
\begin{align} {\bf x}_{t+1}&=A{\bf x}_t + {\bf w}_t\\ {\bf y}_{t}&=C{\bf x}_t + {\bf v}_t\\ {\bf w}_t &\sim N(0,Q)\\ {\bf v}_t &\sim N(0,R)\\ \end{align}
というように表され,\{{\bf y}_1,...,{\bf y}_T\}が与えられたときに,A,C,Q,Rと初期状態の平均ベクトル\pi_1と分散共分散行列V_1を推定します.

論文がとても分かりやすいので,実装に必要なところだけ抜粋します.


Eステップ

以下のように定義します.
\begin{align} {\bf x}_t^{\tau}&=E({\bf x}_t|\{{\bf y}_1^{\tau}\})\\ V_t^{\tau}&=Var({\bf x}_t|\{{\bf y}_1^{\tau}\})\\ \end{align}

Kalman filter
\begin{align} {\bf x}_1^0 &= {\bf \pi}_1\\ V_1^0 &= V_1\\ {\bf x}_t^{t-1} &= A {\bf x}_{t-1}^{t-1}\\ V_t^{t-1} &= A V_{t-1}^{t-1} A' + Q\\ K_t &= V_t^{t-1}C'(CV_t^{t-1}C'+R)^{-1}\\ {\bf x}_t^t &= {\bf x}_t^{t-1} + K_t({\bf y}_t-C{\bf x}_t^{t-1})\\ V_t^t &= V_t^{t-1} - K_t C V_t^{t-1}\\ \end{align}

Kalman smoother
\begin{align} J_{t-1} &= V_{t-1}^{t-1}A'(V_t^{t-1})^{-1}\\ {\bf x}_{t-1}^T &= {\bf x}_{t-1}^{t-1} + J_{t-1}({\bf x}_t^T - A{\bf x}_{t-1}^{t-1})\\ V_{t-1}^T &= V_{t-1}^{t-1} + J_{t-1}(V_t^T - V_t^{t-1})J_{t-1}'\\ V_{T,T-1}^T &= (I - K_T C)AV_{T-1}^{T-1}\\ V_{t-1,t-2}^T &= V_{t-1}^{t-1}J_{t-2}' + J_t-1(V_{t,t-1}^T - AV_{t-1}^{t-1})J_{t-2}'\\ \end{align}

これらを用いて,Mステップへの入力を求めます.
\begin{align} {\hat {\bf x}}_t & = {\bf x}_t^T \\ P_t & = V_t^T + {\bf x}_t^T {{\bf x}_t^T}' \\ P_{t,t-1} & = V_{t,t-1}^T + {\bf x}_t^T{{\bf x}_{t-1}^T}' \\ \end{align}


Mステップ
\begin{align} C^{new} & = (\sum_{t=1}^T {\bf y}_t {\hat{\bf x}}_t')(\sum_{t=1}^T P_t)^{-1} \\ R^{new} & = \frac{1}{T}\sum_{t=1}^T({\bf y}_t{\bf y}_t' - C^{new}{\hat{\bf x}}_t{\bf y}_t') \\ A^{new} & = (\sum_{t=2}^T P_{t,t-1})(\sum_{t=2}^T P_{t-1})^{-1} \\ Q^{new} & = \frac{1}{T-1}(\sum_{t=2}^T P_{t} - A^{new}\sum_{t=2}^T P_{t-1,t}) \\ {\bf \pi}_1^{new} & = {\hat {\bf x}}_1 \\ V_1^{new} & = P_1 - {\hat {\bf x}}_1{\hat {\bf x}}_1' \end{align}
なおN個の観測列があるときは
\begin{align} {\bar {\hat {\bf x}}}_t & = \frac{1}{N}\sum_{i=1}^N {\hat {\bf x}}_t^{(i)} \\ V_1^{new} & = P_1 - {\bar {\hat {\bf x}}}_1{\bar {\hat {\bf x}}}_1' + \frac{1}{N}\sum_{i=1}^N [{\bar {\hat {\bf x}}}_1^{(i)}-{\bar {\hat {\bf x}}}_1][{\bar {\hat {\bf x}}}_1^{(i)}-{\bar {\hat {\bf x}}}_1]' \\ \end{align}



実験

Wikipediaのカルマンフィルタの例にあるシステムで実験してみました.トロッコが摩擦なしの無限レール上でランダムな力によって動くシステムです.

データを生成するために用意したパラメータは
A
[[ 1.   0.1]
 [ 0.   1. ]]
C
[[ 1.  0.]]
Q
[[  2.50000000e-05   5.00000000e-04]
 [  5.00000000e-04   1.00000000e-02]]
R
[[ 1.]]
でした.

が,下のコードによって推定された結果は
A
[[ 0.62656567  0.64773696]
 [ 0.56148647  0.0486408 ]]
C
[[ 0.19605285  1.14560846]]
Q
[[ 0.15479875 -0.12660499]
 [-0.12665761  0.31297647]]
R
[[ 0.59216228]]
pi_1
[[-0.39133729]
 [-0.89786661]]
V_1
[[ 0.18260503 -0.07688508]
 [-0.07691364  0.20206351]]
でした.下のコードでは生成されるデータは毎回異なるので結果はその都度変わります.がどうもうまくいかない.


赤が真の位置,マゼンダが推定された位置.青が真の速度,シアンが推定された速度.


どこがおかしいのだろう(:D)| ̄|_ =3


1. Z. Ghahramani and G.E. Hinton. Parameter estimation for linear dynamical systems. University of Toronto technical report CRG-TR-96-2, 6, 1996.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Parameter Estimation for Linear Dynamical Systems
x[t] = A*x[t-1] + w[t]
y[t] = C*x[t] + v[t]
w[t] ~ N(0,Q)
v[t] ~ N(0,R)
'''
import numpy as np
import matplotlib.pyplot as plt
def E_step(y, A, C, Q, R, pi_1, V_1):
# num of observations
T = len(y)
# prediction
x_p = {} # x_p[t-1] := x_t^{t-1}
V_p = {} # V_p[t-1] := V_t^{t-1}
K = {} # K[t-1] := K_t
# filter
x_f = {} # x_f[t-1] := x_t^t
V_f = {} # V_f[t-1] := V_t^t
# smoother
J = {} # J[t-1] := J_{t}
x_s = {} # x_s[t-1] := x_{t}^T
V_s = {} # V_s[t-1] := V_{t}^T
V_a = {} # V_a[t-1] := V_{t,t-1}^T
# response
x = {} # x[t-1] := {¥hat x}_t
P = {} # P[t-1] := P_t
P_a = {} # P_a[t-1] := P_{t,t-1}
# kalman filter
for t in range(T):
if t == 0: # initialize
x_p[t] = pi_1
V_p[t] = V_1
else:
x_p[t] = A * x_f[t-1]
V_p[t] = A * V_f[t-1] * A.T + Q
K[t] = V_p[t] * C.T * np.linalg.pinv((C * V_p[t] * C.T + R))
x_f[t] = x_p[t] + K[t] * (y[t] - C * x_p[t])
V_f[t] = V_p[t] - K[t] * C * V_p[t]
# kalman smoother
x_s[T-1] = x_f[T-1]
V_s[T-1] = V_f[T-1]
for t in range(T-1, 0, -1):
J[t-1] = V_f[t-1] * A.T * np.linalg.pinv(V_p[t])
x_s[t-1] = x_f[t-1] + J[t-1] * (x_s[t] - A * x_f[t-1])
V_s[t-1] = V_f[t-1] + J[t-1] * (V_s[t] - V_p[t]) * J[t-1].T
I = np.mat(np.eye(*A.shape))
V_a[T-1] = (I - K[T-1] * C) * A * V_f[T-2]
for t in range(T-1, 1, -1):
V_a[t-1] = V_f[t-1] * J[t-2].T + J[t-1] * (V_a[t] - A * V_f[t-1]) * J[t-2].T
# set response
for t in range(T):
x[t] = x_s[t]
P[t] = V_s[t] + x_s[t] * x_s[t].T
if t == 0: continue
P_a[t] = V_a[t] + x_s[t] * x_s[t-1].T
return x, P, P_a
def M_step(y, x, P, P_a):
# num of observations
T = len(y)
# Output matrix
C = sum([y[t]*x[t].T for t in range(T)]) * np.linalg.pinv(sum([P[t] for t in range(T)]))
# Output noise covariance
R = sum([y[t]*y[t].T - C * x[t] * y[t].T for t in range(T)]) / T
# State dynamics matrix
A = sum([P_a[t] for t in range(1,T)]) * np.linalg.pinv(sum([ P[t-1] for t in range(1,T)]))
# State noise covariance
Q = (sum([P[t] for t in range(1,T)]) - A * sum([P_a[t] for t in range(1,T)])) / (T - 1)
#Q = sum( [ (P[t] - A * P_a[t]) for t in range(1,T) ] ) / (T-1)
# Initail state mean
pi_1 = x[1]
# Initial state covariance
V_1 = P[1] - x[1] * x[1].T
return A, C, Q, R, pi_1, V_1
if __name__ == '__main__':
# テストデータ生成
dt = 0.1
x = np.mat([[0.0],
[0.0]])
A = np.mat([[1.0,dt],
[0.0,1.0]])
C = np.mat([[1.0,0]])
Q = np.mat([[dt**4/4,dt**3/2],
[dt**3/2,dt**2]])
R = np.mat([[1.0]])
print "A\n%s\nC\n%s\nQ\n%s\nR\n%s" % (A, C, Q, R)
X = [] # 状態
Y = [] # 観測
K = 500 # サンプル数
for i in range(K):
x = A * x + np.mat(np.random.multivariate_normal((0,0),Q)).T
y = C * x + np.mat(np.random.normal(0,1)).T
X.append(x)
Y.append(y)
# 推定
# 初期値をランダムに振る
A = np.mat(np.random.rand(2,2))
C = np.mat(np.random.rand(1,2))
Q = np.mat(np.random.rand(2,2))
Q = (Q + Q.T) / 2
R = np.mat(np.random.rand(1,1))
pi_1 = np.mat(np.random.rand(2,1))
V_1 = np.mat(np.random.rand(2,2))
V_1 = (V_1 + V_1.T) / 2
N = 100 # EM回数
e = E_step(Y, A, C, Q, R, pi_1, V_1)
for i in range(100):
print i
m = M_step(Y, *e)
e = E_step(Y, *m)
# 結果表示
print "A\n%s\nC\n%s\nQ\n%s\nR\n%s\npi_1\n%s\nV_1\n%s" % m
x_hat, pi, pa = e
# テストデータ
X1 = []
X2 = []
# 推定結果
X3 = []
X4 = []
for x in X:
X1.append(x[0,0])
X2.append(x[1,0])
for i in x_hat:
X3.append(x_hat[i][0,0])
X4.append(x_hat[i][1,0])
plt.plot(X1, 'r-')
plt.plot(X2, 'b-')
plt.plot(X3, 'm-')
plt.plot(X4, 'c-')
plt.show()

1 comment:

  1. hi,
    did you solve the problem? if yes where was the problem in your code?

    ReplyDelete