1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| # 以Cell方式实现GRU # %% import os
import numpy as np import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers, losses, optimizers, Sequential
# 指定GPU os.environ["CUDA_VISIBLE_DEVICES"] = "2"
tf.random.set_seed(22) np.random.seed(22) os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' assert tf.__version__.startswith('2.')
batch_size = 128 # 批量大小 total_words = 10000 # 词汇表大小N_vocab max_review_len = 80 # 句子最大长度s,大于的句子部分将截断,小于的将填充 embedding_len = 100 # 词向量特征长度f # 加载IMDB数据集,此处的数据采用数字编码,一个数字代表一个单词 (x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=total_words) print(x_train.shape, len(x_train[0]), y_train.shape) print(x_test.shape, len(x_test[0]), y_test.shape) # %% print(x_train[0]) # %% # 数字编码表 word_index = keras.datasets.imdb.get_word_index() # for k,v in word_index.items(): # print(k,v) # %% word_index = {k: (v + 3) for k, v in word_index.items()} word_index["<PAD>"] = 0 word_index["<START>"] = 1 word_index["<UNK>"] = 2 # unknown word_index["<UNUSED>"] = 3 # 翻转编码表 reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])
def decode_review(text): return ' '.join([reverse_word_index.get(i, '?') for i in text])
decode_review(x_train[8])
# %%
# x_train:[b, 80] # x_test: [b, 80] # 截断和填充句子,使得等长,此处长句子保留句子后面的部分,短句子在前面填充 x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_review_len) x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_review_len) # 构建数据集,打散,批量,并丢掉最后一个不够batch_size的batch db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train)) db_train = db_train.shuffle(1000).batch(batch_size, drop_remainder=True) db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test)) db_test = db_test.batch(batch_size, drop_remainder=True) print('x_train shape:', x_train.shape, tf.reduce_max(y_train), tf.reduce_min(y_train)) print('x_test shape:', x_test.shape)
# %%
class MyRNN(keras.Model): # Cell方式构建多层网络 def __init__(self, units): super(MyRNN, self).__init__() # [b, 64],构建Cell初始化状态向量,重复使用 # GRU与RNN相同 隐藏层状态为1个 LSTM两个 self.state0 = [tf.zeros([batch_size, units])] self.state1 = [tf.zeros([batch_size, units])] # 词向量编码 [b, 80] => [b, 80, 100] self.embedding = layers.Embedding(total_words, embedding_len, input_length=max_review_len) # 构建2个Cell self.rnn_cell0 = layers.GRUCell(units, dropout=0.5) self.rnn_cell1 = layers.GRUCell(units, dropout=0.5) # 构建分类网络,用于将CELL的输出特征进行分类,2分类 # [b, 80, 100] => [b, 64] => [b, 1] self.out_layer = Sequential([ layers.Dense(units), layers.Dropout(rate=0.5), layers.ReLU(), layers.Dense(1)])
def call(self, inputs, training=None, mask=None): x = inputs # [b, 80] # embedding: [b, 80] => [b, 80, 100] x = self.embedding(x) # rnn cell compute,[b, 80, 100] => [b, 64] state0 = self.state0 state1 = self.state1 out1 = None for word in tf.unstack(x, axis=1): # word: [b, 100] out0, state0 = self.rnn_cell0(word, state0, training) out1, state1 = self.rnn_cell1(out0, state1, training) # 末层最后一个输出作为分类网络的输入: [b, 64] => [b, 1] x = self.out_layer(out1, training) # p(y is pos|x) prob = tf.sigmoid(x)
return prob
def main(): units = 64 # RNN状态向量长度f epochs = 50 # 训练epochs
model = MyRNN(units) # 装配 model.compile(optimizer=optimizers.RMSprop(0.001), loss=losses.BinaryCrossentropy(), metrics=['accuracy']) # 训练和验证 model.fit(db_train, epochs=epochs, validation_data=db_test) # 测试 model.evaluate(db_test)
if __name__ == '__main__': main()
|