2016-08-02 32 views
10

我对这个主题相当陌生,我正在研究一个处理时间序列数据异常检测的项目。我想使用TensorFlow,以便我可以将模型部署到移动设备上。我很难找到TensorFlow中实现的异常检测算法的相关材料和示例。是否有使用TensorFlow实现异常检测算法的示例?

我正在研究的一些算法是用于对窗口化样本和Holt-Winters进行流数据分类的聚类算法。

任何例子都能帮助我!

回答

14

以下是使用Holt-Winters进行顺序过滤的示例。相同的模式应该适用于其他类型的顺序建模,如卡尔曼滤波器。

from matplotlib import pyplot 
import numpy as np 
import tensorflow as tf 

tf.logging.set_verbosity(tf.logging.INFO) 

seasonality = 10 

def model_fn(features, targets): 
    """Defines a basic Holt-Winters sequential filtering model in TensorFlow. 

    See http://www.itl.nist.gov/div898/handbook/pmc/section4/pmc435.htm""" 
    times = features["times"] 
    values = features["values"] 
    # Initial estimates 
    initial_trend = tf.reduce_sum(
     (values[seasonality:2*seasonality] - values[:seasonality]) 
    /seasonality ** 2) 
    initial_smoothed_observation = values[0] 
    # Seasonal indices are multiplicative, so having them near 0 leads to 
    # instability 
    initial_seasonal_indices = 1. + tf.exp(
     tf.get_variable("initial_seasonal_indices", shape=[seasonality])) 
    with tf.variable_scope("smoothing_parameters", 
         initializer=tf.zeros_initializer): 
    # Trained scalars for smoothing, transformed to be in (0, 1) 
    observation_smoothing = tf.sigmoid(
     tf.get_variable(name="observation_smoothing", shape=[])) 
    trend_smoothing = tf.sigmoid(
     tf.get_variable(name="trend_smoothing", shape=[])) 
    seasonal_smoothing = tf.sigmoid(
     tf.get_variable(name="seasonal_smoothing", shape=[])) 
    def filter_function(
     current_index, seasonal_indices, previous_smoothed_observation, 
     previous_trend, previous_loss_sum): 
    current_time = tf.gather(times, current_index) 
    current_observation = tf.gather(values, current_index) 
    current_season = current_time % seasonality 
    one_step_ahead_prediction = (
     (previous_smoothed_observation + previous_trend) 
     * tf.gather(seasonal_indices, current_season)) 
    new_loss_sum = previous_loss_sum + (
     one_step_ahead_prediction - current_observation) ** 2 
    new_smoothed_observation = (
     (observation_smoothing * current_observation 
     /tf.gather(seasonal_indices, current_season)) 
     + ((1. - observation_smoothing) 
      * (previous_smoothed_observation + previous_trend))) 
    new_trend = (
     (trend_smoothing 
     * (new_smoothed_observation - previous_smoothed_observation)) 
     + (1. - trend_smoothing) * previous_trend) 
    updated_seasonal_index = (
     seasonal_smoothing * current_observation/new_smoothed_observation 
     + ((1. - seasonal_smoothing) 
      * tf.gather(seasonal_indices, current_season))) 
    new_seasonal_indices = tf.concat(
     concat_dim=0, 
     values=[seasonal_indices[:current_season], 
       [updated_seasonal_index], 
       seasonal_indices[current_season + 1:]]) 
    # Preserve shape to keep the while_loop shape invariants happy 
    new_seasonal_indices.set_shape(seasonal_indices.get_shape()) 
    return (current_index + 1, new_seasonal_indices, new_smoothed_observation, 
      new_trend, new_loss_sum) 
    def while_run_condition(current_index, *unused_args): 
    return current_index < tf.shape(times)[0] 
    (_, final_seasonal_indices, final_smoothed_observation, final_trend, 
    sum_squared_errors) = tf.while_loop(
     cond=while_run_condition, 
     body=filter_function, 
     loop_vars=[0, initial_seasonal_indices, initial_smoothed_observation, 
       initial_trend, 0.]) 
    normalized_loss = sum_squared_errors/tf.cast(tf.shape(times)[0], 
               dtype=tf.float32) 
    train_op = tf.contrib.layers.optimize_loss(
     loss=normalized_loss, 
     global_step=tf.contrib.framework.get_global_step(), 
     learning_rate=0.1, 
     optimizer="Adam") 
    prediction_times = tf.range(30) 
    prediction_values = (
     (final_smoothed_observation + final_trend * tf.cast(prediction_times, 
                  dtype=tf.float32)) 
     * tf.cast(tf.gather(params=final_seasonal_indices, 
          indices=prediction_times % seasonality), 
       dtype=tf.float32)) 
    predictions = {"times": prediction_times, 
       "values": prediction_values} 
    return predictions, normalized_loss, train_op 

# Create a synthetic time series with seasonality, trend, and a little noise 
series_length = 50 
times = np.arange(series_length, dtype=np.int32) 
values = 5. + (
    0.02 * times + np.sin(times * 2 * np.pi/float(seasonality)) 
    + np.random.normal(size=[series_length], scale=0.2)).astype(np.float32) 
# Define an input function to feed the data into our model 
input_fn = lambda: ({"times":tf.convert_to_tensor(times, dtype=tf.int32), 
        "values":tf.convert_to_tensor(values, dtype=tf.float32)}, 
        {}) 
# Wrap the model in a tf.learn Estimator for training and inference 
estimator = tf.contrib.learn.Estimator(model_fn=model_fn) 
estimator.fit(input_fn=input_fn, steps=500) 
predictions = estimator.predict(input_fn=input_fn, as_iterable=False) 
# Plot the training data and predictions 
pyplot.plot(range(series_length), values) 
pyplot.plot(series_length + predictions["times"], predictions["values"]) 
pyplot.show() 

(我用的是TensorFlow 0.11.0rc0写这个的时候)

Output of Holt-Winters on synthetic data: training data followed by predictions.

然而,扩展到较长的时间序列时,该代码将是相当缓慢的。问题是TensorFlow(以及大多数其他自动分化工具)在顺序计算(循环)方面没有很好的性能。通常这通过批量处理数据并在大块上运行来改善。对于顺序模型来说有点棘手,因为有一个状态需要从一个时间步转移到另一个时间步。

更快(但也许不那么令人满意)的方法是使用自回归模型。这具有很容易在TensorFlow实现额外的好处:

import numpy as np 
from matplotlib import pyplot 
import tensorflow as tf 

tf.logging.set_verbosity(tf.logging.INFO) 

seasonality = 10 

# Create a synthetic time series with seasonality, trend, and a little noise 
series_length = 50 
times = np.arange(series_length, dtype=np.int32) 
values = 5. + (0.02 * times + np.sin(times * 2 * np.pi/float(seasonality)) 
       + np.random.normal(size=[series_length], scale=0.2)).astype(
        np.float32) 
# Parameters for stochastic gradient descent 
batch_size = 16 
window_size = 10 
# Define a column format for the linear regression 
input_column = tf.contrib.layers.real_valued_column(column_name="input_window", 
                dimension=window_size) 
def training_input_fn(): 
    window_starts = tf.random_uniform(shape=[batch_size], dtype=tf.int32, 
            maxval=series_length - window_size - 1) 
    element_indices = (tf.expand_dims(window_starts, 1) 
        + tf.expand_dims(tf.range(window_size), 0)) 
    return ({input_column: tf.gather(values, element_indices)}, 
      tf.gather(values, window_starts + window_size)) 
estimator = tf.contrib.learn.LinearRegressor(feature_columns=[input_column]) 
estimator.fit(input_fn=training_input_fn, steps=500) 
predictions = list(values[-10:]) 
def predict_input_fn(): 
    return ({input_column: tf.reshape(predictions[-10:], [1, 10])}, {}) 
predict_length = 30 
for i in xrange(predict_length): 
    prediction = estimator.predict(input_fn=predict_input_fn, as_iterable=False) 
    predictions.append(prediction[0]) 
predictions = predictions[10:] 
pyplot.plot(range(series_length), values) 
pyplot.plot(series_length + np.arange(predict_length), predictions) 
pyplot.show() 

Output of the autoregressive model on the same synthetic dataset.

注意,因为该模型没有状态保持,我们可以很容易做到小批量随机梯度下降。

对于群集,类似k-means可能适用于时间序列。

相关问题