July 29, 2010

Pythonで隠れマルコフモデルのSmoothingの例

Pythonで隠れマルコフモデルのFilteringの例のSmoothingバージョン.

スムージング
  • 現在まで(0~T)のすべての観測があるときに,過去の状態(t;0<t<T)を推定する.
  • i.e. P(Xt|Y0:T)を求める.
Filtering同様,P(Xt|Y0:T)を変形する.

cは正規化項.P(Xt|Y0:t)はフィルタリング(Forward Algorithm)で求められる.P(Yt+1|Xt+1)は出力確率.P(Xt+1|Xt)は遷移確率.で,再帰的にP(Yt+1:T|Xt)を求められる.以下実装.
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# 状態変数
states = ['Rainy', 'Sunny']

# 観測列
observations = ['walk', 'shop', 'clean', 'shop', 'walk']

# 初期確率
start_probability = {'Rainy': 0.6, 'Sunny': 0.4}
 
# 遷移確率
transition_probability = {
   'Rainy' : {'Rainy': 0.7, 'Sunny': 0.3},
   'Sunny' : {'Rainy': 0.4, 'Sunny': 0.6},
   }

# 出力確率 
emission_probability = {
   'Rainy' : {'walk': 0.1, 'shop': 0.4, 'clean': 0.5},
   'Sunny' : {'walk': 0.6, 'shop': 0.3, 'clean': 0.1},
   }

# Forward
def forward(y1, prior, states, trans_p, emit_p):
    post = {}
    for x1 in states:
        post[x1] = emit_p[x1][y1] * sum([ trans_p[x][x1] * prior[x] for x in states ])
    s = sum(post.values())
    for k,v in post.items():
        post[k] = v/s
    return post

# Filtering
def filtering(observations, states, start_p, trans_p, emit_p):
    prior = start_p
    T = len(observations)
    for t in range(T):
        post = forward( observations[t], prior, states, trans_p, emit_p )
        prior = post
    return post

# Backward
def backward(y1, prior, states, trans_p, emit_p):
    post = {}
    for x in states:
        post[x] = sum([ emit_p[x1][y1] * prior[x1] * trans_p[x][x1] for x1 in states ])
    s = sum(post.values())
    for k,v in post.items():
        post[k] = v/s
    return post

# Smoothing
def smoothing(observations, t, states, start_p, trans_p, emit_p):
    T = len(observations)
    # forward
    f = filtering(observations[:t], states, start_p, trans_p, emit_p)
    # backward
    b = {}
    for x in states:
        b[x] = 1.0 / len(states)
    for i in range(T-1, t-1,-1):
        b = backward(observations[i], b, states, trans_p, emit_p) 
    # product
    results = {}
    for x in states:
        results[x] = f[x] * b[x]
    # normalization
    s = sum(results.values())
    for k,v in results.items():
        results[k] = v/s
    return results

print "Smoothing", smoothing(observations, 3, states, start_probability, transition_probability, emission_probability)
print "Filtering", filtering(observations[:3], states, start_probability, transition_probability, emission_probability)
実行結果は
Smoothing {'Rainy': 0.85679684480228568, 'Sunny': 0.14320315519771437}
Filtering {'Rainy': 0.86342066067036916, 'Sunny': 0.1365793393296309}
['walk', 'shop', 'clean', 'shop', 'walk']という観測列があったときの3日目(cleanをした日)のスムージングの結果とフィルタリングの結果を表している.フィルタリングでは4日目にshopして5日目にwalkしたという情報も使っているので若干Rainyの確率が下がっている.

ちなみに,4日目も5日目もcleanだった場合(観測列が['walk', 'shop', 'clean', 'clean', 'clean'])の3日目の推定結果は
Smoothing {'Rainy': 0.90669272213214924, 'Sunny': 0.093307277867850785}
Filtering {'Rainy': 0.86342066067036916, 'Sunny': 0.1365793393296309}
となり雨の確率が上がる.

4日目も5日目もwalkだった場合は(観測列が['walk', 'shop', 'clean', 'walk', 'walk'])の3日目の推定結果は
Smoothing {'Rainy': 0.78605072114831254, 'Sunny': 0.21394927885168744}
Filtering {'Rainy': 0.86342066067036916, 'Sunny': 0.1365793393296309}
となり雨の確率は下がる.