Python es un lenguaje de secuencias de comandos dinámico. No solo tiene un sistema de tipo dinámico donde una variable puede asignarse primero a un tipo y cambiarse después, sino que su modelo de objetos también es dinámico. Esto nos permite modificar su comportamiento en tiempo de ejecución. Una consecuencia de esto es la posibilidad de parchear monos. Esta es una idea de que podemos modificar la capa base de un programa sin modificar el código de nivel superior. Imagina que puedes usar el print()
para imprimir algo en la pantalla, y podemos modificar la definición de esta función para imprimirlo en un archivo sin modificar ninguna línea de su código.
Es posible porque Python es un lenguaje interpretado, por lo que podemos realizar cambios mientras se ejecuta el programa. Podemos hacer uso de esta propiedad en Python para modificar la interfaz de una clase o un módulo. Es útil si estamos tratando con código heredado o código de otras personas en el que no queremos modificarlo mucho pero queremos hacerlo funcionar con diferentes versiones de bibliotecas o entornos. En este tutorial, vamos a ver cómo podemos aplicar esta técnica a algunos códigos de Keras y TensorFlow.
Después de terminar este tutorial, aprenderá:
- ¿Qué es el parche de mono?
- Cómo cambiar un objeto o un módulo en Python en tiempo de ejecución
Empecemos.
Código de Python de parches de mono. Foto de Juan Rumimpunu. Algunos derechos reservados.
Descripción general del tutorial
Este tutorial consta de tres partes; ellos son:
- Un modelo, dos interfaces
- Extender un objeto con parches de mono
- Aplicación de parches mono para revivir el código heredado
Un modelo, dos interfaces
TensorFlow es una biblioteca enorme. Proporciona una API Keras de alto nivel para describir modelos de aprendizaje profundo en capas. También viene con un montón de funciones para el entrenamiento, como diferentes optimizadores y generadores de datos. Es abrumador instalar TensorFlow solo porque necesitamos ejecutar nuestro modelo entrenado. Por lo tanto, TensorFlow nos proporciona una contraparte llamada TensorFlow Lite que es mucho más pequeño en tamaño y adecuado para ejecutarse en dispositivos pequeños como dispositivos móviles o integrados.
Queremos mostrar cómo el modelo TensorFlow Keras original y el modelo TensorFlow Lite se usan de manera diferente. Así que hagamos un modelo de tamaño moderado, como el modelo LeNet-5. A continuación se muestra cómo cargamos el conjunto de datos MNIST y entrenamos un modelo para la clasificación:
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, AveragePooling2D, Dropout, Flatten
from tensorflow.keras.callbacks import EarlyStopping
# Load MNIST data
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# Reshape data to shape of (n_sample, height, width, n_channel)
X_train = np.expand_dims(X_train, axis=3).astype(‘float32’)
X_test = np.expand_dims(X_test, axis=3).astype(‘float32’)
# LeNet5 model: ReLU can be used intead of tanh
model = Sequential([
Conv2D(6, (5,5), input_shape=(28,28,1), padding=»same», activation=»tanh»),
AveragePooling2D((2,2), strides=2),
Conv2D(16, (5,5), activation=»tanh»),
AveragePooling2D((2,2), strides=2),
Conv2D(120, (5,5), activation=»tanh»),
Flatten(),
Dense(84, activation=»tanh»),
Dense(10, activation=»softmax»)
])
# Training
model.compile(loss=»sparse_categorical_crossentropy», optimizer=»adam», metrics=[«sparse_categorical_accuracy»])
earlystopping = EarlyStopping(monitor=»val_loss», patience=4, restore_best_weights=True)
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100, batch_size=32, callbacks=[earlystopping])
Al ejecutar el código anterior, se descargará el conjunto de datos MNIST mediante la API del conjunto de datos de TensorFlow y se entrenará el modelo. Después, podemos guardar el modelo:
model.save(«lenet5-mnist.h5»)
O podemos evaluar el modelo con nuestro conjunto de prueba:
print(np.argmax(model.predict(X_test), axis=1))
print(y_test)
Entonces deberíamos ver:
[7 2 1 … 4 5 6][7 2 1 … 4 5 6]
Pero si pretendemos usarlo con TensorFlow Lite, queremos convertirlo al formato TensorFlow Lite de la siguiente manera:
# tflite conversion with dynamic range optimization
import tensorflow as tf
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
# Optional: Save the data for testing
import numpy as np
np.savez(‘mnist-test.npz’, X=X_test, y=y_test)
# Save the model.
with open(‘lenet5-mnist.tflite’, ‘wb’) as f:
f.write(tflite_model)
Podemos agregar más opciones al convertidor, como reducir el modelo para usar un punto flotante de 16 bits. Pero en todos los casos, la salida de la conversión es una cadena binaria. La conversión no solo reducirá el modelo a un tamaño mucho más pequeño (en comparación con el tamaño del archivo HDF5 guardado de Keras), sino que también nos permitirá usarlo con una biblioteca liviana. Hay bibliotecas para dispositivos móviles Android e iOS. Si está utilizando Linux incorporado, puede encontrar el tflite-runtime
módulo del repositorio de PyPI (o puede compilar uno desde el código fuente de TensorFlow). A continuación se muestra cómo podemos usar tflite-runtime
para ejecutar el modelo convertido:
import numpy as np
import tflite_runtime.interpreter as tflite
loaded = np.load(‘mnist-test.npz’)
X_test = loaded[«X»]
y_test = loaded[«y»]
interpreter = tflite.Interpreter(model_path=»lenet5-mnist.tflite»)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print(input_details[0][‘shape’])
rows = []
for n in range(len(X_test)):
# this model has single input and single output
interpreter.set_tensor(input_details[0][‘index’], X_test[n:n+1])
interpreter.invoke()
row = interpreter.get_tensor(output_details[0][‘index’])
rows.append(row)
rows = np.vstack(rows)
accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test)
print(accuracy)
De hecho, la biblioteca más grande de TensorFlow también puede ejecutar el modelo convertido con una sintaxis muy similar:
import numpy as np
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path=»lenet5-mnist.tflite»)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
rows = []
for n in range(len(X_test)):
# this model has single input and single output
interpreter.set_tensor(input_details[0][‘index’], X_test[n:n+1])
interpreter.invoke()
row = interpreter.get_tensor(output_details[0][‘index’])
rows.append(row)
rows = np.vstack(rows)
accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test)
print(accuracy)
Tenga en cuenta las diferentes formas de usar los modelos: En el modelo Keras, tenemos el predict()
función que toma un lote como entrada y devuelve un resultado. En el modelo TensorFlow Lite, sin embargo, tenemos que inyectar un tensor de entrada a la vez al «intérprete» e invocarlo, luego recuperar el resultado.
Poniendo todo junto, el siguiente código es cómo construimos un modelo Keras, lo entrenamos, lo convertimos al formato TensorFlow Lite y lo probamos con el modelo convertido:
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, AveragePooling2D, Dropout, Flatten
from tensorflow.keras.callbacks import EarlyStopping
# Load MNIST data
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# Reshape data to shape of (n_sample, height, width, n_channel)
X_train = np.expand_dims(X_train, axis=3).astype(‘float32’)
X_test = np.expand_dims(X_test, axis=3).astype(‘float32’)
# LeNet5 model: ReLU can be used intead of tanh
model = Sequential([
Conv2D(6, (5,5), input_shape=(28,28,1), padding=»same», activation=»tanh»),
AveragePooling2D((2,2), strides=2),
Conv2D(16, (5,5), activation=»tanh»),
AveragePooling2D((2,2), strides=2),
Conv2D(120, (5,5), activation=»tanh»),
Flatten(),
Dense(84, activation=»tanh»),
Dense(10, activation=»softmax»)
])
# Training
model.compile(loss=»sparse_categorical_crossentropy», optimizer=»adam», metrics=[«sparse_categorical_accuracy»])
earlystopping = EarlyStopping(monitor=»val_loss», patience=4, restore_best_weights=True)
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100, batch_size=32, callbacks=[earlystopping])
# Save model
model.save(«lenet5-mnist.h5″)
# Compare the prediction vs test data
print(np.argmax(model.predict(X_test), axis=1))
print(y_test)
# tflite conversion with dynamic range optimization
import tensorflow as tf
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
# Optional: Save the data for testing
import numpy as np
np.savez(‘mnist-test.npz’, X=X_test, y=y_test)
# Save the tflite model.
with open(‘lenet5-mnist.tflite’, ‘wb’) as f:
f.write(tflite_model)
# Load the tflite model and run test
interpreter = tf.lite.Interpreter(model_path=»lenet5-mnist.tflite»)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
rows = []
for n in range(len(X_test)):
# this model has single input and single output
interpreter.set_tensor(input_details[0][‘index’], X_test[n:n+1])
interpreter.invoke()
row = interpreter.get_tensor(output_details[0][‘index’])
rows.append(row)
rows = np.vstack(rows)
accuracy = np.sum(np.argmax(rows, axis=1) == y_test) / len(y_test)
print(accuracy)
Ampliación de un objeto con Monkey Patching
Podemos usar predict()
en el intérprete de TensorFlow Lite?
El objeto intérprete no tiene tal función. Pero como estamos usando Python, es posible que lo agreguemos usando el parche de mono técnica. Para entender lo que estamos haciendo, primero, tenemos que notar que el interpreter
El objeto que definimos en el código anterior puede contener muchos atributos y funciones. cuando llamamos interpreter.predict()
como una función, Python buscará la que tenga ese nombre dentro del objeto y luego la ejecutará. Si no se encuentra ese nombre, Python generará el AttributeError
excepción:
…
interpreter.predict()
Eso da:
Traceback (most recent call last):
File «/Users/MLM/pred_error.py», line 13, in <module>
interpreter.predict()
AttributeError: ‘Interpreter’ object has no attribute ‘predict’
Para que esto funcione, necesitamos agregar una función al interpreter
objeto con el nombre predict
, y que debería comportarse como uno cuando se invoca. Para simplificar las cosas, notamos que nuestro modelo es secuencial con una matriz como entrada y devuelve una matriz de resultados de softmax como salida. Entonces podemos escribir un predict()
función que se comporta como la del modelo de Keras, pero usando el intérprete de TensorFlow Lite:
…
# Monkey patching the tflite model
def predict(self, input_batch):
batch_size = len(input_batch)
output = []
input_details = self.get_input_details()
output_details = self.get_output_details()
# Run each sample from the batch
for sample in range(batch_size):
self.set_tensor(input_details[0][«index»], input_batch[sample:sample+1])
self.invoke()
sample_output = self.get_tensor(output_details[0][«index»])
output.append(sample_output)
# vstack the output of each sample
return np.vstack(output)
interpreter.predict = predict.__get__(interpreter)
La última línea de arriba asigna la función que creamos al interpreter
objeto, con el nombre predict
. Él __get__(interpreter)
se requiere parte para hacer una función que definimos para convertirse en una función miembro del objeto interpreter
.
Con estos, ahora podemos ejecutar un lote:
…
out_proba = interpreter.predict(X_test)
out = np.argmax(out_proba, axis=1)
print(out)
accuracy = np.sum(out == y_test) / len(y_test)
print(accuracy)
[7 2 1 … 4 5 6]
0.9879
Esto es posible porque Python tiene un modelo de objeto dinámico. Podemos modificar atributos o funciones miembro de un objeto en tiempo de ejecución. De hecho, esto no debería sorprendernos. Un modelo de Keras necesita ejecutarse model.compile()
antes de que podamos correr model.fit()
. Un efecto de la función de compilación es agregar el atributo loss
al modelo para mantener la función de pérdida. Esto se logra en tiempo de ejecución.
Con el predict()
función añadida a la interpreter
objeto, podemos pasar alrededor del interpreter
objeto como un modelo de Keras entrenado para la predicción. Si bien son diferentes detrás de escena, comparten la misma interfaz para que otras funciones puedan usarla sin modificar ninguna línea de código.
A continuación se muestra el código completo para cargar nuestro modelo TensorFlow Lite guardado, luego parchear el mono predict()
función para que se vea como un modelo de Keras:
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
# Load MNIST data and reshape
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = np.expand_dims(X_train, axis=3).astype(‘float32’)
X_test = np.expand_dims(X_test, axis=3).astype(‘float32’)
# Monkey patching the tflite model
def predict(self, input_batch):
batch_size = len(input_batch)
output = []
input_details = self.get_input_details()
output_details = self.get_output_details()
# Run each sample from the batch
for sample in range(batch_size):
self.set_tensor(input_details[0][«index»], input_batch[sample:sample+1])
self.invoke()
sample_output = self.get_tensor(output_details[0][«index»])
output.append(sample_output)
# vstack the output of each sample
return np.vstack(output)
# Load and monkey patch
interpreter = tf.lite.Interpreter(model_path=»lenet5-mnist.tflite»)
interpreter.predict = predict.__get__(interpreter)
interpreter.allocate_tensors()
# test output
out_proba = interpreter.predict(X_test)
out = np.argmax(out_proba, axis=1)
print(out)
accuracy = np.sum(out == y_test) / len(y_test)
print(accuracy)
Monkey Patching para revivir el código heredado
Podemos dar un ejemplo más de parches de mono en Python. Considere el siguiente código:
# https://machinelearningmastery.com/dropout-regularization-deep-learning-models-keras/
# Example of Dropout on the Sonar Dataset: Hidden Layer
from pandas import read_csv
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Dropout
from keras.wrappers.scikit_learn import KerasClassifier
from keras.constraints import maxnorm
from keras.optimizers import SGD
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# load dataset
dataframe = read_csv(«sonar.csv», header=None)
dataset = dataframe.values
# split into input (X) and output (Y) variables
X = dataset[:,0:60].astype(float)
Y = dataset[:,60]
# encode class values as integers
encoder = LabelEncoder()
encoder.fit(Y)
encoded_Y = encoder.transform(Y)
# dropout in hidden layers with weight constraint
def create_model():
# create model
model = Sequential()
model.add(Dense(60, input_dim=60, activation=’relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(30, activation=’relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(1, activation=’sigmoid’))
# Compile model
sgd = SGD(lr=0.1, momentum=0.9)
model.compile(loss=»binary_crossentropy», optimizer=sgd, metrics=[‘accuracy’])
return model
estimators = []
estimators.append((‘standardize’, StandardScaler()))
estimators.append((‘mlp’, KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0)))
pipeline = Pipeline(estimators)
kfold = StratifiedKFold(n_splits=10, shuffle=True)
results = cross_val_score(pipeline, X, encoded_Y, cv=kfold)
print(«Hidden: %.2f%% (%.2f%%)» % (results.mean()*100, results.std()*100))
Este código se escribió hace algunos años y supone una versión anterior de Keras con TensorFlow 1.x. el archivo de datos sonar.csv
se encuentra en la otra publicación. Si ejecutamos este código con TensorFlow 2.5, veremos el problema de un ImportError
en la linea de SGD
. Necesitamos hacer dos cambios como mínimo en el código anterior para que se ejecute:
- Las funciones y clases deben importarse desde
tensorflow.keras
en vez dekeras
- La clase de restricción
maxnorm
debe estar en caso de camello,MaxNorm
El siguiente es el código actualizado, en el que modificamos solo las declaraciones de importación:
# Example of Dropout on the Sonar Dataset: Hidden Layer
from pandas import read_csv
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from tensorflow.keras.constraints import MaxNorm as maxnorm
from tensorflow.keras.optimizers import SGD
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# load dataset
dataframe = read_csv(«sonar.csv», header=None)
dataset = dataframe.values
# split into input (X) and output (Y) variables
X = dataset[:,0:60].astype(float)
Y = dataset[:,60]
# encode class values as integers
encoder = LabelEncoder()
encoder.fit(Y)
encoded_Y = encoder.transform(Y)
# dropout in hidden layers with weight constraint
def create_model():
# create model
model = Sequential()
model.add(Dense(60, input_dim=60, activation=’relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(30, activation=’relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(1, activation=’sigmoid’))
# Compile model
sgd = SGD(lr=0.1, momentum=0.9)
model.compile(loss=»binary_crossentropy», optimizer=sgd, metrics=[‘accuracy’])
return model
estimators = []
estimators.append((‘standardize’, StandardScaler()))
estimators.append((‘mlp’, KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0)))
pipeline = Pipeline(estimators)
kfold = StratifiedKFold(n_splits=10, shuffle=True)
results = cross_val_score(pipeline, X, encoded_Y, cv=kfold)
print(«Hidden: %.2f%% (%.2f%%)» % (results.mean()*100, results.std()*100))
Si tenemos un proyecto mucho más grande con muchos scripts, sería tedioso modificar cada línea de importación. Pero el sistema de módulos de Python es solo un diccionario en sys.modules
. Por lo tanto, podemos parchearlo para que el código antiguo encaje con la nueva biblioteca. Lo siguiente es cómo lo hacemos. Esto funciona para las instalaciones de TensorFlow 2.5 (este problema de compatibilidad con versiones anteriores del código Keras se solucionó en TensorFlow 2.9; por lo tanto, no necesita este parche en la última versión de las bibliotecas):
# monkey patching
import sys
import tensorflow.keras
tensorflow.keras.constraints.maxnorm = tensorflow.keras.constraints.MaxNorm
for x in sys.modules.keys():
if x.startswith(«tensorflow.keras»):
sys.modules[x[len(«tensorflow.»):]] = sys.modules[x]
# Old code below:
# Example of Dropout on the Sonar Dataset: Hidden Layer
from pandas import read_csv
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Dropout
from keras.wrappers.scikit_learn import KerasClassifier
from keras.constraints import maxnorm
from keras.optimizers import SGD
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# load dataset
dataframe = read_csv(«sonar.csv», header=None)
dataset = dataframe.values
# split into input (X) and output (Y) variables
X = dataset[:,0:60].astype(float)
Y = dataset[:,60]
# encode class values as integers
encoder = LabelEncoder()
encoder.fit(Y)
encoded_Y = encoder.transform(Y)
# dropout in hidden layers with weight constraint
def create_model():
# create model
model = Sequential()
model.add(Dense(60, input_dim=60, activation=’relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(30, activation=’relu’, kernel_constraint=maxnorm(3)))
model.add(Dropout(0.2))
model.add(Dense(1, activation=’sigmoid’))
# Compile model
sgd = SGD(lr=0.1, momentum=0.9)
model.compile(loss=»binary_crossentropy», optimizer=sgd, metrics=[‘accuracy’])
return model
estimators = []
estimators.append((‘standardize’, StandardScaler()))
estimators.append((‘mlp’, KerasClassifier(build_fn=create_model, epochs=300, batch_size=16, verbose=0)))
pipeline = Pipeline(estimators)
kfold = StratifiedKFold(n_splits=10, shuffle=True)
results = cross_val_score(pipeline, X, encoded_Y, cv=kfold)
print(«Hidden: %.2f%% (%.2f%%)» % (results.mean()*100, results.std()*100))
Este definitivamente no es un código limpio y ordenado, y será un problema para el mantenimiento futuro. Por lo tanto, los parches mono no son bienvenidos en el código de producción. Sin embargo, esta sería una técnica rápida que explota el mecanismo interno del lenguaje Python para hacer que algo funcione fácilmente.
Lecturas adicionales
Esta sección proporciona más recursos sobre el tema si desea profundizar más.
Artículos
Resumen
En este tutorial, aprendimos qué es el parche de mono y cómo hacerlo. Específicamente,
- Aprendimos cómo agregar una función miembro a un objeto existente
- Cómo modificar el caché del módulo de Python en
sys.modules
para engañar a laimport
declaraciones