# -*- coding: utf-8 -*- ''' This is adapted from a tutorial at: https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/r2/tutorials/text/image_captioning.ipynb ''' from __future__ import absolute_import, division, print_function, unicode_literals import tensorflow as tf from tensorflow.keras import activations, layers, losses, optimizers # You'll generate plots of attention in order to see which parts of an image # our model focuses on during captioning import matplotlib.pyplot as plt from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas from matplotlib.figure import Figure # Scikit-learn includes many helpful utilities from sklearn.model_selection import train_test_split from sklearn.utils import shuffle import re import numpy as np import os import time import json from glob import glob from PIL import Image import pickle from tqdm import tqdm '''## Download and prepare the MS-COCO dataset You will use the [MS-COCO dataset](http://cocodataset.org/#home) to train our model. The dataset contains over 82,000 images, each of which has at least 5 different caption annotations. Due to computational considerations we will only consider the 71,973 captions that have 8 or fewer words, and the corresponding 48,659 corresponding images. Indeed we limit this further to the 20,000 images with the most captions, a total of 43,314 captions. The code below downloads and extracts the dataset automatically. **Caution: large download ahead**. The 20,000 images, is about ~3GB file. ''' annotation_zip = tf.keras.utils.get_file( 'captions.zip', cache_subdir=os.path.abspath('.'), #origin='http://images.cocodataset.org/annotations/annotations_trainval2014.zip', origin="/studier/emner/matnat/its/TEK5040/h19/data/captions_trainval2014_8_20000.zip?vrtxPreviewUnpublished", extract=True ) annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json' name_of_zip = 'train2014.zip' if not os.path.exists(os.path.abspath('.') + '/' + name_of_zip): image_zip = tf.keras.utils.get_file( name_of_zip, cache_subdir=os.path.abspath('.'), #origin='http://images.cocodataset.org/zips/train2014.zip', origin="/studier/emner/matnat/its/TEK5040/h19/data/train2014_8_20000.zip?vrtxPreviewUnpublished", extract=True) PATH = os.path.dirname(image_zip)+'/train2014/' else: PATH = os.path.abspath('.')+'/train2014/' '''## Optional: limit the size of the training set In the beginning you probably want to use only a small subset of the captions. In this way you don't have to wait too long to create the cached features. When you are happy with your code you could scale up to a larger subset, or even all the captions. *NOTE*: When changing NUM_EXAMPLES you may have to delete/rename old checkpoints as they may not be compatible due to change in vocabulary size, and thus our output layer. ''' # Read the json file with open(annotation_file, 'r') as f: annotations = json.load(f) # Store captions and image names in vectors all_captions = [] all_img_name_vector = [] # We add special words and to indicate 'start' and 'end' of sentence for annot in annotations['annotations']: caption = ' ' + annot['caption'] + ' ' image_id = annot['image_id'] full_coco_image_path = PATH + 'COCO_train2014_' + '%012d.jpg' % (image_id) all_img_name_vector.append(full_coco_image_path) all_captions.append(caption) # Shuffle captions and image_names together # Set a random state captions, img_name_vector = shuffle(all_captions, all_img_name_vector, random_state=1) # limit to the first NUM_EXAMPLES from the shuffled set NUM_EXAMPLES = 100 trainval_captions = captions[:NUM_EXAMPLES] img_name_vector = img_name_vector[:NUM_EXAMPLES] print("Number of total captions: %d" % len(all_captions)) print("Number of captions used for training and validation: %d" % len(trainval_captions)) '''## Preprocess the images using NASNetMobile Next, you will use [NASNetMobile](https://keras.io/applications/#nasnet) (which is pretrained on Imagenet) to classify each image. You will extract features from the last convolutional layer. First, you will convert the images into NASNetMobile expected format by: * Resizing the image to 224px by 224px * Preprocess the images by normalize the image so that it contains pixels in the range of -1 to 1, which matches the format of the images used to train NASNet. ''' image_height = 224 image_width = 224 def load_image(image_path): img = tf.io.read_file(image_path) img = tf.image.decode_jpeg(img, channels=3) img = tf.image.resize(img, (image_height, image_width)) img = tf.keras.applications.nasnet.preprocess_input(img) return img, image_path '''## Initialize NASNetMobile and load the pretrained Imagenet weights Now you'll create a tf.keras model where the output layer is the last convolutional layer in the NASNetMobile architecture. The shape of the output of this layer is ```7x7x1056```. You use the last convolutional layer because you are using attention in this example. You don't perform this initialization during training because it could become a bottleneck. * You forward each image through the network and store the resulting vector in a dictionary (image_name --> feature_vector). * After all the images are passed through the network, you pickle the dictionary and save it to disk. ''' image_model = tf.keras.applications.NASNetMobile(include_top=False, weights='imagenet') new_input = image_model.input hidden_layer = image_model.layers[-1].output image_features_extract_model = tf.keras.Model(new_input, hidden_layer) '''## Caching the features extracted from NASNetMobile You will pre-process each image with NASNetMobile and cache the output to disk. Caching the output in RAM would be faster but also memory intensive, requiring 7 \* 7 \* 1056 floats per image. Performance could be improved with a more sophisticated caching strategy (for example, by sharding the images to reduce random access disk I/O), but that would require more code. The caching will take a few minutes to run in Colab with a GPU, but may take several hours on a laptop without GPU. ''' # whether to force recompute of features even if exist in cache, useful if e.g. # changing model to compute features from FORCE_FEATURE_COMPUTE = False # Get unique images encode_train = sorted(set(img_name_vector)) if not FORCE_FEATURE_COMPUTE: encode_train = [p for p in encode_train if not os.path.exists(p+'.npy')] if len(encode_train) > 0: # Feel free to change batch_size according to your system configuration image_dataset = tf.data.Dataset.from_tensor_slices(encode_train) image_dataset = image_dataset.map(load_image, num_parallel_calls=1).batch(1) print("Caching features for %d images." % len(encode_train)) for img, path in tqdm(image_dataset): batch_features = image_features_extract_model(img) # collapse height and width dimension # (batch_size, 7, 7, 1056) --> (batch_size, 49, 1056) batch_features = tf.reshape(batch_features, (batch_features.shape[0], -1, batch_features.shape[3])) for bf, p in zip(batch_features, path): path_of_feature = p.numpy().decode("utf-8") np.save(path_of_feature, bf.numpy()) '''## Preprocess and tokenize the captions * First, you'll tokenize the captions (for example, by splitting on spaces). This gives us a vocabulary of all of the unique words in the data (for example, "surfing", "football", and so on). * Next, you'll limit the vocabulary size to the top 5,000 words (to save memory). You'll replace all other words with the token "UNK" (unknown). * You then create word-to-index and index-to-word mappings. * Finally, you pad all sequences to be the same length as the longest one. ''' # Find the maximum length of any caption in our dataset def calc_max_length(tensor): return max(len(t) for t in tensor) # Choose the top 5000 words from the vocabulary top_k = 5000 tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k, oov_token="", filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ') tokenizer.fit_on_texts(trainval_captions) tokenizer.word_index[''] = 0 tokenizer.index_word[0] = '' # Create the tokenized vectors, e.g. hypothetically # [ A man walking his dog ] --> [4, 4201, 13, 403, 35, 5, 321] trainval_seqs = tokenizer.texts_to_sequences(trainval_captions) # Pad each vector to the max_length of the captions # If you do not provide a max_length value, pad_sequences calculates it automatically # e.g. if maxlen was 10 we would get # [4, 4201, 13, 403, 35, 5, 321] --> [4, 4201, 13, 403, 35, 5, 321, 0, 0, 0] cap_vector = tf.keras.preprocessing.sequence.pad_sequences(trainval_seqs, padding='post') # Calculates the max_length, which is used to store the attention weights max_length = calc_max_length(trainval_seqs) print("Max length of captions in trainval: %d" % max_length) '''## Split the data into training and testing''' # Create training and validation sets using an 80-20 split img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector, cap_vector, test_size=0.2, random_state=0) print("Number of training captions: %d" % len(cap_train)) print("Number of validation captions: %d" % len(cap_val)) '''## Create a tf.data dataset for training Our images and captions are ready! Next, let's create a tf.data dataset to use for training our model. ''' # Feel free to change these parameters according to your system's configuration BATCH_SIZE = 16 BUFFER_SIZE = 100 embedding_dim = 256 units = 512 vocab_size = len(tokenizer.word_index) + 1 num_steps = len(img_name_train) // BATCH_SIZE # Shape of the vector extracted from NASNetMobile is (49, 1056) # These two variables represent that vector shape feature_channels = 1056 feature_height = feature_width = 7 attention_features_shape = feature_height * feature_width spatial_positions = feature_height * feature_width # Load the numpy files def map_func(img_name, cap): img_tensor = np.load(img_name.decode('utf-8')+'.npy') return img_tensor, cap def create_dataset(img_name, cap, shuffle=False): dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train)) # Use map to load the numpy files in parallel dataset = dataset.map(lambda item1, item2: tf.numpy_function( map_func, [item1, item2], [tf.float32, tf.int32]), num_parallel_calls=tf.data.experimental.AUTOTUNE) # Shuffle and batch if shuffle: dataset = dataset.shuffle(BUFFER_SIZE) dataset = dataset.batch(BATCH_SIZE) dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE) return dataset dataset_train = create_dataset(img_name_train, cap_train, shuffle=True) dataset_val = create_dataset(img_name_val, cap_val, shuffle=False) '''## Model The model architecture is inspired by the [Show, Attend and Tell](https://arxiv.org/pdf/1502.03044.pdf) paper. * In this example, you extract the features from the lower convolutional layer of NASNetMobile giving us a vector of shape (7, 7, 1056). * You squash that to a shape of (49, 1056). * This vector is then passed through the CNN Encoder (which consists of a single Fully connected layer). * The RNN (here LSTM) attends over the image to predict the next word. ''' class UniformAttention(layers.Layer): def call(self, feature_vectors, state_output): """Note: We do not use state_output.""" batch_size = tf.shape(feature_vectors)[0] num_feature_vectors = tf.shape(feature_vectors)[1] attention_weights = tf.ones((batch_size, num_feature_vectors)) / np.float32(num_feature_vectors) # ==> [batch_size, feature_units] context_vector = tf.reduce_sum(tf.expand_dims(attention_weights, axis=-1)* feature_vectors, axis=-1) return context_vector, attention_weights class CNN_Encoder(tf.keras.Model): # Since you have already extracted the features and dumped it using pickle # This encoder passes those features through a Fully connected layer def __init__(self, embedding_dim): super(CNN_Encoder, self).__init__() # shape after fc == (batch_size, 64, embedding_dim) self.fc = layers.Dense(embedding_dim) def call(self, x): x = self.fc(x) x = activations.relu(x) return x class RNN_Decoder(tf.keras.Model): def __init__(self, embedding_dim, units, vocab_size, attention_type): super(RNN_Decoder, self).__init__() self.units = units self.embedding = layers.Embedding(vocab_size, embedding_dim) self.lstm = layers.LSTMCell(self.units, recurrent_initializer='glorot_uniform') self.fc1 = layers.Dense(self.units) self.fc2 = layers.Dense(vocab_size) self.attention_type = attention_type.lower() if self.attention_type == "uniform": self.attention = UniformAttention() elif self.attention_type == "dotproduct": raise NotImplementedError("TODO") elif self.attention_type == "bahdanau": raise NotImplementedError("TODO") else: raise ValueError( "attention_type '%s' not recognized. Expected oneof %s" % (self.attention_type, ["uniform", "dotproduct", "bahdanau"]) ) self.get_initial_state = self.lstm.get_initial_state def call(self, inputs): y, features, state_output, hidden = inputs # defining attention as a separate model # Hmm, shouldn't we use previous word to decide where next to attend? context_vector, attention_weights = self.attention(features, state_output) # y shape after passing through embedding == (batch_size, 1, embedding_dim) y = self.embedding(y) # x shape after concatenation == (batch_size, features_dim + embedding_dim) x = tf.concat([context_vector, tf.squeeze(y, axis=1)], axis=-1) # passing the concatenated vector to the LSTM cell state_output, state = self.lstm(x, hidden) # shape == (batch_size, units) x = self.fc1(state_output) # output shape == (batch_size, vocab_size) x = self.fc2(x) return x, state_output, state, attention_weights attention_type = "uniform" #attention_type = "dotproduct" #attention_type = "bahdanau" encoder = CNN_Encoder(embedding_dim) decoder = RNN_Decoder(embedding_dim, units, vocab_size, attention_type=attention_type) '''## Loss and optimizer''' # Note that the learning rate has not been optimized. You may also want to # implement a decreasing learning rate schedule for optimal performance. optimizer = optimizers.Adam(learning_rate=0.001) loss_object = losses.SparseCategoricalCrossentropy( from_logits=True, reduction='none') def loss_function(real, pred): mask = tf.math.logical_not(tf.math.equal(real, 0)) loss_ = loss_object(real, pred) mask = tf.cast(mask, dtype=loss_.dtype) loss_ *= mask # num_samples is the number of samples trained on num_samples = tf.reduce_sum(mask) return tf.reduce_sum(loss_), num_samples '''## Checkpoint''' train_dir = "train_dir/%s" % attention_type checkpoint_path = train_dir + "/checkpoints" ckpt = tf.train.Checkpoint(encoder=encoder, decoder=decoder, optimizer = optimizer) ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5) if ckpt_manager.latest_checkpoint: print("Restored weights from {}".format(ckpt_manager.latest_checkpoint)) ckpt.restore(ckpt_manager.latest_checkpoint) else: print("Initializing random weights.") start_epoch = 0 if ckpt_manager.latest_checkpoint: start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1]) '''## Define some utils for visualization of captions ''' def evaluate(image): max_vis = min(max_length, 9) attention_plot = np.zeros((max_vis, attention_features_shape)) hidden = decoder.get_initial_state(batch_size=1, dtype="float32") state_out = hidden[0] # why not this returned... temp_input = tf.expand_dims(load_image(image)[0], 0) img_tensor_val = image_features_extract_model(temp_input) img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3])) features = encoder(img_tensor_val) dec_input = tf.expand_dims([tokenizer.word_index['']], 0) result = [] for i in range(max_vis): predictions, state_out, hidden, attention_weights = decoder([dec_input, features, state_out, hidden]) attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy() predicted_id = tf.argmax(predictions[0]).numpy() result.append(tokenizer.index_word[predicted_id]) if tokenizer.index_word[predicted_id] == '': return result, attention_plot dec_input = tf.expand_dims([predicted_id], 0) attention_plot = attention_plot[:len(result), :] return result, attention_plot def plot_attention(image, result, attention_plot): temp_image = np.array(Image.open(image)) fig = Figure(figsize=(10, 10)) canvas = FigureCanvas(fig) len_result = len(result) for l in range(len_result): temp_att = np.resize(attention_plot[l], (feature_height, feature_width)) num_cols = int(np.ceil(np.sqrt(len_result))) num_rows = (len_result + num_cols - 1) // num_cols ax = fig.add_subplot(num_rows, num_cols, l+1) ax.set_title(result[l]) img = ax.imshow(temp_image) ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent()) fig.tight_layout() canvas.draw() x = np.frombuffer(canvas.tostring_rgb(), dtype='uint8') # width, height = fig.get_size_inches() * fig.get_aligni() width, height = fig.get_size_inches() * fig.get_dpi() width = int(np.round(width)) height = int(np.round(height)) x = np.reshape(x, [height, width, 3]) return x '''## Training * You extract the features stored in the respective `.npy` files and then pass those features through the encoder. * The encoder output, hidden state(initialized to 0) and the decoder input (which is the start token) is passed to the decoder. * The decoder returns the predictions and the decoder hidden state. * The decoder hidden state is then passed back into the model and the predictions are used to calculate the loss. * Use teacher forcing to decide the next input to the decoder. * Teacher forcing is the technique where the target word is passed as the next input to the decoder. * The final step is to calculate the gradients and apply it to the optimizer and backpropagate. ''' train_writer = tf.summary.create_file_writer(train_dir + "/train", flush_millis=3000) val_writer = tf.summary.create_file_writer(train_dir + "/val", flush_millis=3000) def train_step(img_tensor, target): loss = 0 num_samples = 0 batch_size = target.shape[0] hidden = decoder.get_initial_state(batch_size=batch_size, dtype="float32") state_out = hidden[0] dec_input = tf.expand_dims([tokenizer.word_index['']] * batch_size, 1) with tf.GradientTape() as tape: features = encoder(img_tensor) for i in range(1, target.shape[1]): # passing the features through the decoder predictions, state_out, hidden, _ = decoder([dec_input, features, state_out, hidden]) loss_t, num_samples_t = loss_function(target[:, i], predictions) loss += loss_t num_samples += num_samples_t # using teacher forcing dec_input = tf.expand_dims(target[:, i], 1) # value loss of each samle in batch equally average_loss = loss / num_samples trainable_variables = encoder.trainable_variables + decoder.trainable_variables gradients = tape.gradient(average_loss, trainable_variables) optimizer.apply_gradients(zip(gradients, trainable_variables)) return loss, num_samples def val_step(img_tensor, target): """Similar to train loop, except that we don't calculate gradients and update variables. """ loss = 0 num_samples = 0 batch_size = target.shape[0] hidden = decoder.get_initial_state(batch_size=batch_size, dtype="float32") state_out = hidden[0] dec_input = tf.expand_dims([tokenizer.word_index['']] * batch_size, 1) features = encoder(img_tensor) for i in range(1, target.shape[1]): # passing the features through the decoder predictions, state_out, hidden, _ = decoder([dec_input, features, state_out, hidden]) loss_t, num_samples_t = loss_function(target[:, i], predictions) loss += loss_t num_samples += num_samples_t # using teacher forcing dec_input = tf.expand_dims(target[:, i], 1) return loss, num_samples def val_loss(): """Calculate validation loss for entire validation set.""" start = time.time() total_loss = 0 total_samples = 0 for (batch, (img_tensor, target)) in enumerate(dataset_val): loss, samples = val_step(img_tensor, target) total_loss += loss total_samples += samples # storing the epoch end loss value to plot later average_loss_epoch = total_loss / total_samples print('Validation time: {} sec\n'.format(time.time() - start)) return average_loss_epoch EPOCHS = 10 summary_interval = 10 step = num_steps * start_epoch num_summary_images = 5 checkpoint_every_n_epochs = 1 for epoch in range(start_epoch, EPOCHS): start = time.time() total_loss = 0 total_samples = 0 summary_images = img_name_val[:num_summary_images] for idx, image in enumerate(summary_images): result, attention_plot = evaluate(image) x = plot_attention(image, result, attention_plot) x = tf.expand_dims(tf.convert_to_tensor(x), 0) with val_writer.as_default(): tf.summary.image("image_%d" % idx, x, step=step) for (batch, (img_tensor, target)) in enumerate(dataset_train): loss, samples = train_step(img_tensor, target) total_loss += loss total_samples += samples step += 1 if batch % 100 == 0: # NOTE: this loss will have high variance print ('Epoch {} Batch {} Loss {:.4f}'.format( epoch + 1, batch, loss.numpy() / samples.numpy())) with train_writer.as_default(): tf.summary.scalar("loss", total_loss/total_samples, step=step) # storing the epoch end loss value to plot later average_loss_epoch = total_loss / total_samples # do validattion val_l = val_loss() with val_writer.as_default(): tf.summary.scalar("loss", val_l, step=step) if (epoch+1) % checkpoint_every_n_epochs == 0: print("Checkpointing model after %d epochs of training." % (epoch+1)) ckpt_manager.save(epoch+1) print('Epoch {} Loss {:.6f}'.format(epoch + 1, average_loss_epoch)) print('Time taken for 1 epoch {} sec\n'.format(time.time() - start)) '''## Try it on your own images Keep in mind, it was trained on a relatively small amount of data, and your images may be different from the training data (so be prepared for weird results!) ''' image_url = 'https://tensorflow.org/images/surf.jpg' image_extension = image_url[-4:] image_path = tf.keras.utils.get_file('image'+image_extension, origin=image_url) result, attention_plot = evaluate(image_path) print ('Prediction Caption:', ' '.join(result)) x = plot_attention(image_path, result, attention_plot) plt.imshow(x) plt.show() '''## Save models You can also restore model from checkpoints, but then you have to first build your model with the code from this script and use a checkpointmanager to load the weights. An often more convenient method is to use the `model.save` method, to save both the model and the weights. We need to call `model._set_inputs` when we haven't used the `model.predict` or `model.fit` functions. ''' encoder._set_inputs(tf.keras.Input([spatial_positions, feature_channels])) encoder.save(os.path.join(train_dir, "encoder.hd5")) #decoder._set_inputs( # [ # tf.keras.Input([vocab_size]), # predicted word # tf.keras.Input([spatial_positions, embedding_dim]), # embedded spatial features # tf.keras.Input([units]), # output LSTM # [tf.keras.Input([units]), tf.keras.Input([units])] # hidden LSTM state # ] #) #decoder.save(os.path.join(train_dir, "decoder.hd5")) # Currently having some issues with setting inputs like this. Thus we save # weights only for now, which makes it a bit more complicated to load the # model. decoder.save_weights(os.path.join(train_dir, "decoder.hd5")) # In e.g. a different script you may now load the models in this way # encoder = tf.keras.models.load_model("/path/to/encoder.hd5") # decoder = RNNDecoder(same params used for saved model) # decoder.load_weights("/path/to/decoder.hd5")