In [64]:
from os import listdir
from pickle import dump
from keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from keras.models import Model

In [299]:
# extract feature from each photo in directory
def extract_feature(directory):
 model = VGG16()
 #restructure model, here we remove last softmax layer from this model
 model.layers.pop
 model = Model(inputs=model.inputs, outputs=model.layers[-1].output)
 print(model.summary)
 
 #extract feature from each photo
 feature = dict()
 for name in listdir(directory):
 filename = directory + '/' + name
 image = load_img(filename, target_size=(224,224))
 #convert img pixels to numpy array
 image = img_to_array(image)
 #reshape data for the model
 image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
 #preprocess img for preprocess model
 image = preprocess_input(image)
 #get features
 features = model.predict(image, verbose=0)
 #get img id
 img_id = name.split('.')[0]
 #storing features
 feature[img_id] = features
 print(">%s" % name)
 return feature

In [None]:
directory = "img_captioning_dataset/Images"
features = extract_feature(directory)
# print("Extracted Features: %d" %len(features))
# dump(features, open('img_captioning_features/features.pkl', 'wb'))

In [385]:
import string
from nltk.tokenize import word_tokenize

def load_doc(filename):
 # open the file as read only
 file = open(filename, 'r')
 # read all text
 text = file.read()
 # close the file
 file.close()
 return text

#extract description of image
def load_description(doc):
 mapping = dict()
 for line in doc.split('\n'):
 token = word_tokenize(line)
 if len(line) < 2:
 continue
 image_id, image_desc = token[0], token[1:]
 image_id = image_id.split('.')[0]
 image_desc = ' '.join(image_desc)
 if image_id not in mapping:
 mapping[image_id] = list()
 mapping[image_id].append(image_desc)
 return mapping

In [386]:
def clean_descriptions(descriptions):
 # prepare translation table for removing punctuation
 table = str.maketrans('', '', string.punctuation)
 for key, desc_list in descriptions.items():
 for i in range(len(desc_list)):
 desc = desc_list[i]
 # tokenize
 desc = desc.split()
 # convert to lower case
 desc = [word.lower() for word in desc]
 # remove punctuation from each token
 desc = [w.translate(table) for w in desc]
 # remove hanging 's' and 'a'
 desc = [word for word in desc if len(word)>1]
 # remove tokens with numbers in them
 desc = [word for word in desc if word.isalpha()]
 # store as string
 desc_list[i] = ' '.join(desc)
def to_vocabulary(descriptions):
 # build a list of all description strings
 all_desc = set()
 for key in descriptions.keys():
 [all_desc.update(d.split()) for d in descriptions[key]]
 return all_desc

In [387]:
def save_descriptions(descriptions, filename):
 lines = list()
 for key, desc_list in descriptions.items():
 for desc in desc_list:
 lines.append(key + " " + desc)
 data = '\n'.join(lines)
 file = open(filename, 'w')
 file.write(data)
 file.close()

In [388]:
filename = "Flickr8k.token.txt"
doc = load_doc(filename)
descriptions = load_description(doc)
print("Loaded: %d" %len(descriptions))

Loaded: 8092


In [389]:
#clean desc
clean_descriptions(descriptions)
vocab = to_vocabulary(descriptions)
print("Vocab size: %d" %len(vocab))

Vocab size: 8761


In [390]:
save_descriptions(descriptions, "another-way/descriptions1.txt")

### Extract Identifier

In [281]:
from pickle import dump

In [391]:
#load into memory
def load_doc(filename):
 with open(filename, 'r') as f:
 content = f.read()
 return content
#pre-defined list of photo identifier
def load_set(filename):
 doc = load_doc(filename)
 dataset = list()
 for line in doc.split("\n"):
 if len(line) < 1:
 continue
 identifier = line.split('.')[0]
 dataset.append(identifier)
 return set(dataset)

In [392]:
def load_clean_descripitions(filename, dataset):
 doc = load_doc(filename)
 descriptions = dict()
 for line in doc.split('\n'):
 tokens = word_tokenize(line)
 image_id, image_desc = tokens[0], tokens[1:]
 if image_id in dataset:
 if image_id not in descriptions:
 descriptions[image_id] = list()
 #wrap description in token
 desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
 descriptions[image_id].append(desc)
 return descriptions

In [393]:
def load_photo_features(features, dataset):
 all_features = load(open(features, 'rb'))
 features = {k: all_features[k] for k in dataset if k in all_features}
 return features

In [394]:
from pickle import load

features = "Flickr_8k.trainImages.txt"
train = load_set(features)
print("dataset: %d" %len(train))
train_descriptions = load_clean_descripitions("descriptions1.txt", train)
print("Descriptions: train=%d" %len(train_descriptions))
train_features = load_photo_features("features.pkl", train)
print('Photos: train=%d' % len(train_features))

dataset: 6000
Descriptions: train=6000
Photos: train=6000


In [444]:
train_descriptions

{'1191338263_a4fa073154': ['startseq little old lady sitting next to an advertisement endseq',
 'startseq an asian woman waiting at an underground train stop endseq',
 'startseq an old woman sits in transit station next to backlit advertisement endseq',
 'startseq woman sits in subway station endseq',
 'startseq woman with an umbrella is sitting at station with an aquos commercial on the wall endseq'],
 '218342358_1755a9cce1': ['startseq cyclist wearing red helmet is riding on the pavement endseq',
 'startseq girl is riding bike on the street while wearing red helmet endseq',
 'startseq person on bike wearing red helmet riding down street endseq',
 'startseq woman wears red helmet and blue shirt as she goes for bike ride in the shade endseq',
 'startseq person in blue shirt and red helmet riding bike down the road endseq'],
 '2187222896_c206d63396': ['startseq boy in red shirt in front of long blue wall raises his eyebrow at the camera endseq',
 'startseq boy in red shirt with stripes 

### Now going to use keras tokenizer to change text to numeric form

In [396]:
# dict to clean list
def to_lines(descriptions):
 all_desc = list()
 for key in descriptions.keys():
 [all_desc.append(d) for d in descriptions[key]]
 return all_desc

In [397]:
def create_tokenizer(descriptions):
 lines = to_lines(descriptions)
 tokenizer = Tokenizer()
 tokenizer.fit_on_texts(lines)
 return tokenizer

In [398]:
tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary Size: %d' % vocab_size)

Vocabulary Size: 7577


In [399]:
#len of description
def max_length(description):
 lines = to_lines(description)
 return max(len(d.split()) for d in lines)

In [462]:
# create input and output sequence
def create_sequences(tokenizer, max_length, desc_list, photo):
 X1, X2, y = list(), list(), list()
 # walk through each description for the image
 for desc in desc_list:
 # encode the sequence
 seq = tokenizer.texts_to_sequences([desc])[0]
 # split one sequence into multiple X,y pairs
 for i in range(1, len(seq)):
 # split into input and output pair
 in_seq, out_seq = seq[:i], seq[i]
 # pad input sequence
 in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
 # encode output sequence
 out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
 # store
 X1.append(photo)
 X2.append(in_seq)
 y.append(out_seq)
 return array(X1), array(X2), array(y)

In [401]:
from numpy import array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical, plot_model
from keras.models import Model
from keras.layers import Input, Dense, Activation, Dropout, Embedding,LSTM, Bidirectional, BatchNormalization
from keras.layers.merging import add
from keras.callbacks import ModelCheckpoint

### Model creation

In [467]:
def define_model(vocab_size, max_length):
 # feature extractor model
 inputs1 = Input(shape=(1000,))
 fe1 = Dropout(0.5)(inputs1)
 fe2 = Dense(256, activation='relu')(fe1)
 # sequence model
 inputs2 = Input(shape=(max_length,))
 se1 = Embedding(vocab_size,output_dim=256, mask_zero=True)(inputs2)
 se2 = Dropout(0.5)(se1)
 se3 = LSTM(256)(se2)
 # decoder model
 decoder1 = concatenate([fe2, se3])
 decoder2 = Dense(256, activation='relu')(decoder1)
 outputs = Dense(vocab_size, activation='softmax')(decoder2)
 # tie it together [image, seq] [word]
 model = Model(inputs=[inputs1, inputs2], outputs=outputs)
 model.compile(loss='categorical_crossentropy', optimizer='adam')
 # summarize model
 print(model.summary())
 return model

In [463]:
# load batch of data
def data_generator(descriptions, photos, tokenizer, max_length):
 # loop for ever over images
 while 1:
 for key, desc_list in descriptions.items():
 # retrieve the photo feature
 photo = photos[key][0]
 in_img, in_seq, out_word = create_sequences(tokenizer, max_length, desc_list, photo)
 yield [[in_img, in_seq], out_word]

In [464]:
#load train dataset
import tensorflow as tf
filename = "Flickr_8k.trainImages.txt"
train = load_set(filename)
print("Dataset: %d" %len(train))

train_descriptions = load_clean_descripitions("descriptions1.txt", train)
print("train_descriptions= %d" %len(train_descriptions))

train_feature = load_photo_features("features.pkl", train)
print("photos: train= %d" %len(train_feature))

tokenizer = create_tokenizer(train_descriptions)
vocab_size = len(tokenizer.word_index)+1
print("Vocab size: %d" %vocab_size)

max_len = max_length(train_descriptions)
print('Description Length: %d' % max_len)

Dataset: 6000
train_descriptions= 6000
photos: train= 6000
Vocab size: 7577
Description Length: 34


In [468]:
#train model
model = define_model(vocab_size, max_len)
epochs = 10
steps = len(train_descriptions)

Model: "model_47"
__________________________________________________________________________________________________
 Layer (type) Output Shape Param # Connected to 
 input_121 (InputLayer) [(None, 34)] 0 [] 
 
 input_120 (InputLayer) [(None, 1000)] 0 [] 
 
 embedding_52 (Embedding) (None, 34, 256) 1939712 ['input_121[0][0]'] 
 
 dropout_109 (Dropout) (None, 1000) 0 ['input_120[0][0]'] 
 
 dropout_110 (Dropout) (None, 34, 256) 0 ['embedding_52[0][0]'] 
 
 dense_151 (Dense) (None, 256) 256256 ['dropout_109[0][0]'] 
 
 lstm_52 (LSTM) (None, 256) 525312 ['dropout_110[0][0]'] 
 
 concatenate_8 (Concatenate) (None, 512) 0 ['dense_151[0][0]', 
 'lstm_52[0][0]'] 
 
 dense_152 (Dense) (None, 256) 131328 ['concatenate_8[0][0]'] 
 
 dense_153 (Dense) (None, 7577) 1947289 ['dense_152[0][0]'] 
 
Total params: 4,799,897
Trainable params: 4,799,897
Non-trainable params: 0
__________________________________________________________________________________________________
None


In [469]:
for i in range(epochs):
 #create data generator
 generator = data_generator(train_descriptions, train_feature, tokenizer, max_len)
 model.fit(generator, epochs=1, steps_per_epoch = steps, verbose=1)
 model.save("model_" + str(i) + "h5")

 514/6000 [=>............................] - ETA: 25:52 - loss: 5.8238

KeyboardInterrupt: 

In [None]:
# from tensorflow.keras.callbacks import ModelCheckpoint

# # Define the number of epochs and steps
# epochs = 10
# steps_per_epoch = len(train_descriptions)

# # Create a data generator
# generator = data_generator(train_descriptions, train_feature, tokenizer, max_len)

# # Define a checkpoint callback to save the model after each epoch
# checkpoint = ModelCheckpoint(filepath="model_{epoch}.h5", save_weights_only=False, save_format="h5")

# # Train the model for the specified number of epochs
# model.fit(generator, epochs=epochs, steps_per_epoch=steps_per_epoch, verbose=1, callbacks=[checkpoint])

In [2]:
def word_for_id(interger, tokenizer):
 for word, index in tokenizer.word_index.items():
 if index==interger:
 return word
 return None
def generate_desc(model, tokenizer, photo, max_len):
 in_text = "start_seq"
 for i in range(max_len):
 sequence = tokenizer.texts_to_sequences([in_text])[0]
 sequence = pad_sequence([sequence], maxlen = max_len)
 yhat = model.predict([photo, sequence], verbose=1)
 yhat = argmax(yhat)
 word = word_for_id(yhat, tokenizer)
 if word is None:
 break
 in_text += ' '+word
 if word=='endseq':
 break
 return in_text

In [3]:
# evaluated the skill of model
def evaluate_model(model, description, photos, tokenizer, max_length):
 actual, predicted = list(), list()
 for key, desc_list in description.items():
 yhat = generate_desc(model, tokenizer, photos[key], max_length)
 references = [d.split() for d in desc_list]
 actual.append(yhat.split())
 predicted.append(yhat.split())
print("BLUE-1: %f" %corpus_bleu(actual, predicted, weights=(1.0,0,0,0)))
print("BLUE-2: %f" %corpus_bleu(actual, predicted, weights=(0.5,0.5,0,0)))
print("BLUE-3: %f" %corpus_bleu(actual, predicted, weights=(0.3,0.3,0.3,0)))
print("BLUE-4: %f" %corpus_bleu(actual, predicted, weights=(0.25,0.25,0.25,0.25)))

NameError: name 'corpus_bleu' is not defined