Using collective learning with keras¶
This tutorial is a simple guide to trying out the collective learning protocol with your own machine learning code. Everything runs locally.
The most flexible way to use the collective learning backends is to make a class that implements
the Collective Learning MachineLearningInterface
defined in ml_interface.py.
For more details on how to use the MachineLearningInterface
see here
However, the simpler way is to use one of the helper classes that we have provided that implement
most of the interface for popular ML libraries.
In this tutorial we are going to walk through using the KerasLearner
.
First we are going to define the model architecture, then
we are going to load the data and configure the model, and then we will run Collective Learning.
A standard script for machine learning with Keras looks like the one below
# ------------------------------------------------------------------------------
#
# Copyright 2021 Fetch.AI Limited
#
# Licensed under the Creative Commons Attribution-NonCommercial International
# License, Version 4.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://creativecommons.org/licenses/by-nc/4.0/legalcode
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
import tensorflow as tf
import tensorflow_datasets as tfds
from colearn_keras.utils import normalize_img
n_rounds = 20
width = 28
height = 28
n_classes = 10
l_rate = 0.001
batch_size = 64
# Load the data
train_dataset, info = tfds.load('mnist', split='train', as_supervised=True, with_info=True)
n_train = info.splits['train'].num_examples
test_dataset = tfds.load('mnist', split='test', as_supervised=True)
train_dataset = train_dataset.map(normalize_img,
num_parallel_calls=tf.data.experimental.AUTOTUNE)
train_dataset = train_dataset.shuffle(n_train)
train_dataset = train_dataset.batch(batch_size)
test_dataset = test_dataset.map(normalize_img,
num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_dataset = test_dataset.batch(batch_size)
# Define the model
input_img = tf.keras.Input(shape=(width, height, 1), name="Input")
x = tf.keras.layers.Conv2D(64, (3, 3), activation="relu", padding="same", name="Conv1_1")(input_img)
x = tf.keras.layers.BatchNormalization(name="bn1")(x)
x = tf.keras.layers.MaxPooling2D((2, 2), name="pool1")(x)
x = tf.keras.layers.Conv2D(128, (3, 3), activation="relu", padding="same", name="Conv2_1")(x)
x = tf.keras.layers.BatchNormalization(name="bn4")(x)
x = tf.keras.layers.MaxPooling2D((2, 2), name="pool2")(x)
x = tf.keras.layers.Flatten(name="flatten")(x)
x = tf.keras.layers.Dense(n_classes, activation="softmax", name="fc1")(x)
model = tf.keras.Model(inputs=input_img, outputs=x)
opt = tf.keras.optimizers.Adam(lr=l_rate)
model.compile(
loss="sparse_categorical_crossentropy",
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
optimizer=opt)
# Train and evaluate model
for round in range(n_rounds):
model.fit(train_dataset, steps_per_epoch=40)
result = model.evaluate(x=test_dataset, return_dict=True, steps=10)
print(f"Performance at round {round} is {result}")
There are three steps:
- Load the data
- Define the model
- Train the model
In this tutorial we are going to see how to modify each step to use collective learning. We'll end up with code like this:
# ------------------------------------------------------------------------------
#
# Copyright 2021 Fetch.AI Limited
#
# Licensed under the Creative Commons Attribution-NonCommercial International
# License, Version 4.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://creativecommons.org/licenses/by-nc/4.0/legalcode
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
import os
import tensorflow as tf
import tensorflow_datasets as tfds
from colearn.training import initial_result, collective_learning_round, set_equal_weights
from colearn.utils.plot import ColearnPlot
from colearn.utils.results import Results, print_results
from colearn_keras.keras_learner import KerasLearner
from colearn_keras.utils import normalize_img
"""
MNIST training example using Keras
Used dataset:
- MNIST is set of 60 000 black and white hand written digits images of size 28x28x1 in 10 classes
What script does:
- Loads MNIST dataset from Keras
- Sets up a Keras learner
- Randomly splits dataset between multiple learners
- Does multiple rounds of learning process and displays plot with results
"""
n_learners = 5
vote_threshold = 0.5
vote_batches = 2
testing_mode = bool(os.getenv("COLEARN_EXAMPLES_TEST", "")) # for testing
n_rounds = 20 if not testing_mode else 1
width = 28
height = 28
n_classes = 10
l_rate = 0.001
batch_size = 64
# Load data for each learner
train_dataset, info = tfds.load('mnist', split='train', as_supervised=True, with_info=True)
n_datapoints = info.splits['train'].num_examples
train_datasets = [train_dataset.shard(num_shards=n_learners, index=i) for i in range(n_learners)]
test_dataset = tfds.load('mnist', split='test', as_supervised=True)
vote_datasets = [test_dataset.shard(num_shards=2 * n_learners, index=i) for i in range(n_learners)]
test_datasets = [test_dataset.shard(num_shards=2 * n_learners, index=i) for i in range(n_learners, 2 * n_learners)]
for i in range(n_learners):
train_datasets[i] = train_datasets[i].map(
normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
train_datasets[i] = train_datasets[i].shuffle(n_datapoints // n_learners)
train_datasets[i] = train_datasets[i].batch(batch_size)
vote_datasets[i] = vote_datasets[i].map(
normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
vote_datasets[i] = vote_datasets[i].batch(batch_size)
test_datasets[i] = test_datasets[i].map(
normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_datasets[i] = test_datasets[i].batch(batch_size)
# Define model
def get_model():
input_img = tf.keras.Input(
shape=(width, height, 1), name="Input"
)
x = tf.keras.layers.Conv2D(
64, (3, 3), activation="relu", padding="same", name="Conv1_1"
)(input_img)
x = tf.keras.layers.BatchNormalization(name="bn1")(x)
x = tf.keras.layers.MaxPooling2D((2, 2), name="pool1")(x)
x = tf.keras.layers.Conv2D(
128, (3, 3), activation="relu", padding="same", name="Conv2_1"
)(x)
x = tf.keras.layers.BatchNormalization(name="bn4")(x)
x = tf.keras.layers.MaxPooling2D((2, 2), name="pool2")(x)
x = tf.keras.layers.Flatten(name="flatten")(x)
x = tf.keras.layers.Dense(
n_classes, activation="softmax", name="fc1"
)(x)
model = tf.keras.Model(inputs=input_img, outputs=x)
opt = tf.keras.optimizers.Adam(lr=l_rate)
model.compile(
loss="sparse_categorical_crossentropy",
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
optimizer=opt)
return model
all_learner_models = []
for i in range(n_learners):
all_learner_models.append(KerasLearner(
model=get_model(),
train_loader=train_datasets[i],
vote_loader=vote_datasets[i],
test_loader=test_datasets[i],
criterion="sparse_categorical_accuracy",
minimise_criterion=False,
model_evaluate_kwargs={"steps": vote_batches},
))
set_equal_weights(all_learner_models)
# Train the model using Collective Learning
results = Results()
results.data.append(initial_result(all_learner_models))
plot = ColearnPlot(score_name=all_learner_models[0].criterion)
for round_index in range(n_rounds):
results.data.append(
collective_learning_round(all_learner_models,
vote_threshold, round_index)
)
print_results(results)
plot.plot_results_and_votes(results)
plot.block()
print("Colearn Example Finished!")
The first thing is to modify the data loading code. Each learner needs to have their own training and testing set from the data. This is easy to do with keras:
train_datasets = [train_dataset.shard(num_shards=n_learners, index=i) for i in range(n_learners)]
The model definition is very similar too, except that each learner will need its own copy of the model, so we've moved it into a function.
To use collective learning, we need to create an object that implements the MachineLearningInterface.
To make it easier to use the MachineLearningInterface
with keras, we've defined KerasLearner
.
KerasLearner
implements standard training and evaluation routines as well as the MachineLearningInterface methods.
# ------------------------------------------------------------------------------
#
# Copyright 2021 Fetch.AI Limited
#
# Licensed under the Creative Commons Attribution-NonCommercial International
# License, Version 4.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://creativecommons.org/licenses/by-nc/4.0/legalcode
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
from inspect import signature
from typing import Optional
try:
import tensorflow as tf
except ImportError:
raise Exception("Tensorflow is not installed. To use the tensorflow/keras "
"add-ons please install colearn with `pip install colearn[keras]`.")
from tensorflow import keras
from colearn.ml_interface import MachineLearningInterface, Weights, ProposedWeights, ColearnModel, ModelFormat, convert_model_to_onnx
from colearn.ml_interface import DiffPrivBudget, DiffPrivConfig, TrainingSummary, ErrorCodes
from tensorflow_privacy.privacy.analysis.compute_dp_sgd_privacy import compute_dp_sgd_privacy
from tensorflow_privacy.privacy.optimizers.dp_optimizer_keras import make_keras_optimizer_class
class KerasLearner(MachineLearningInterface):
"""
Tensorflow Keras learner implementation of machine learning interface
"""
def __init__(self, model: keras.Model,
train_loader: tf.data.Dataset,
vote_loader: tf.data.Dataset,
test_loader: Optional[tf.data.Dataset] = None,
need_reset_optimizer: bool = True,
minimise_criterion: bool = True,
criterion: str = 'loss',
model_fit_kwargs: Optional[dict] = None,
model_evaluate_kwargs: Optional[dict] = None,
diff_priv_config: Optional[DiffPrivConfig] = None):
"""
:param model: Keras model used for training
:param train_loader: Training dataset
:param test_loader: Optional test set. Subset of training set will be used if not specified.
:param need_reset_optimizer: True to clear optimizer history before training, False to kepp history.
:param minimise_criterion: Boolean - True to minimise value of criterion, False to maximise
:param criterion: Function to measure model performance
:param model_fit_kwargs: Arguments to be passed on model.fit function call
:param model_evaluate_kwargs: Arguments to be passed on model.evaluate function call
:param diff_priv_config: Contains differential privacy (dp) budget related configuration
"""
self.model: keras.Model = model
self.train_loader: tf.data.Dataset = train_loader
self.vote_loader: tf.data.Dataset = vote_loader
self.test_loader: Optional[tf.data.Dataset] = test_loader
self.need_reset_optimizer = need_reset_optimizer
self.minimise_criterion: bool = minimise_criterion
self.criterion = criterion
self.model_fit_kwargs = model_fit_kwargs or {}
self.diff_priv_config = diff_priv_config
self.cumulative_epochs = 0
if self.diff_priv_config is not None:
self.diff_priv_budget = DiffPrivBudget(
target_epsilon=self.diff_priv_config.target_epsilon,
target_delta=self.diff_priv_config.target_delta,
consumed_epsilon=0.0,
# we will always use the highest available delta now
consumed_delta=self.diff_priv_config.target_delta
)
if 'epochs' in self.model_fit_kwargs.keys():
self.epochs_per_proposal = self.model_fit_kwargs['epochs']
else:
self.epochs_per_proposal = signature(self.model.fit).parameters['epochs'].default
if model_fit_kwargs:
# check that these are valid kwargs for model fit
sig = signature(self.model.fit)
try:
sig.bind_partial(**self.model_fit_kwargs)
except TypeError:
raise Exception("Invalid arguments for model.fit")
self.model_evaluate_kwargs = model_evaluate_kwargs or {}
if model_evaluate_kwargs:
# check that these are valid kwargs for model evaluate
sig = signature(self.model.evaluate)
try:
sig.bind_partial(**self.model_evaluate_kwargs)
except TypeError:
raise Exception("Invalid arguments for model.evaluate")
self.vote_score: float = self.test(self.vote_loader)
def reset_optimizer(self):
"""
Recompiles the Keras model. This way the optimizer history get erased,
which is needed before a new training round, otherwise the outdated history is used.
"""
compile_args = self.model._get_compile_args() # pylint: disable=protected-access
opt_config = self.model.optimizer.get_config()
if self.diff_priv_config is not None:
# tensorflow_privacy optimizers get_config() miss the additional parameters
# was fixed here: https://github.com/tensorflow/privacy/commit/49db04e3561638fc02795edb5774d322cdd1d7d1
# but it is not yet in the stable version, thus I need here to do the same.
opt_config.update({
'l2_norm_clip': self.model.optimizer._l2_norm_clip, # pylint: disable=protected-access
'noise_multiplier': self.model.optimizer._noise_multiplier, # pylint: disable=protected-access
'num_microbatches': self.model.optimizer._num_microbatches, # pylint: disable=protected-access
})
new_opt = make_keras_optimizer_class(
getattr(keras.optimizers, opt_config['name'])
).from_config(opt_config)
compile_args['optimizer'] = new_opt
else:
compile_args['optimizer'] = getattr(keras.optimizers,
opt_config['name']).from_config(opt_config)
self.model.compile(**compile_args)
def mli_propose_weights(self) -> Weights:
"""
Trains model on training set and returns new weights after training
- Current model is reverted to original state after training
:return: Weights after training
"""
current_weights = self.mli_get_current_weights()
if self.diff_priv_config is not None:
epsilon_after_training = self.get_privacy_budget()
if epsilon_after_training > self.diff_priv_budget.target_epsilon:
return Weights(
weights=current_weights,
training_summary=TrainingSummary(
dp_budget=self.diff_priv_budget,
error_code=ErrorCodes.DP_BUDGET_EXCEEDED
)
)
self.train()
new_weights = self.mli_get_current_weights()
self.set_weights(current_weights)
if self.diff_priv_config is not None:
self.diff_priv_budget.consumed_epsilon = epsilon_after_training
self.cumulative_epochs += self.epochs_per_proposal
new_weights.training_summary = TrainingSummary(dp_budget=self.diff_priv_budget)
return new_weights
def mli_test_weights(self, weights: Weights) -> ProposedWeights:
"""
Tests given weights on training and test set and returns weights with score values
:param weights: Weights to be tested
:return: ProposedWeights - Weights with vote and test score
"""
current_weights = self.mli_get_current_weights()
self.set_weights(weights)
vote_score = self.test(self.vote_loader)
if self.test_loader:
test_score = self.test(self.test_loader)
else:
test_score = 0
vote = self.vote(vote_score)
self.set_weights(current_weights)
return ProposedWeights(weights=weights,
vote_score=vote_score,
test_score=test_score,
vote=vote,
)
def vote(self, new_score) -> bool:
"""
Compares current model score with proposed model score and returns vote
:param new_score: Proposed score
:return: bool positive or negative vote
"""
if self.minimise_criterion:
return new_score < self.vote_score
else:
return new_score > self.vote_score
def mli_accept_weights(self, weights: Weights):
"""
Updates the model with the proposed set of weights
:param weights: The new weights
"""
self.set_weights(weights)
self.vote_score = self.test(self.vote_loader)
def get_train_batch_size(self) -> int:
"""
Calculates train batch size.
"""
if hasattr(self.train_loader, '_batch_size'):
return self.train_loader._batch_size # pylint: disable=protected-access
else:
return self.train_loader._input_dataset._batch_size # pylint: disable=protected-access
def get_privacy_budget(self) -> float:
"""
Calculates, what epsilon will apply after another model training.
Need to calculate it in advance to see if another training would result in privacy budget violation.
"""
batch_size = self.get_train_batch_size()
iterations_per_epoch = tf.data.experimental.cardinality(self.train_loader).numpy()
n_samples = batch_size * iterations_per_epoch
planned_epochs = self.cumulative_epochs + self.epochs_per_proposal
epsilon, _ = compute_dp_sgd_privacy(
n=n_samples,
batch_size=batch_size,
noise_multiplier=self.diff_priv_config.noise_multiplier, # type: ignore
epochs=planned_epochs,
delta=self.diff_priv_budget.target_delta
)
return epsilon
def mli_get_current_weights(self) -> Weights:
"""
:return: The current weights of the model
"""
return Weights(weights=self.model.get_weights())
def mli_get_current_model(self) -> ColearnModel:
"""
:return: The current model and its format
"""
return ColearnModel(
model_format=ModelFormat(ModelFormat.ONNX),
model_file="",
model=convert_model_to_onnx(self.model),
)
def set_weights(self, weights: Weights):
"""
Rewrites weight of current model
:param weights: Weights to be stored
"""
self.model.set_weights(weights.weights)
def train(self):
"""
Trains the model on the training dataset
"""
if self.need_reset_optimizer:
# erase the outdated optimizer memory (momentums mostly)
self.reset_optimizer()
self.model.fit(self.train_loader, **self.model_fit_kwargs)
def test(self, loader: tf.data.Dataset) -> float:
"""
Tests performance of the model on specified dataset
:param loader: Dataset for testing
:return: Value of performance metric
"""
result = self.model.evaluate(x=loader, return_dict=True,
**self.model_evaluate_kwargs)
return result[self.criterion]
We create a set of KerasLearners by passing in the model and the datasets:
all_learner_models = []
for i in range(n_learners):
all_learner_models.append(KerasLearner(
model=get_model(),
train_loader=train_datasets[i],
vote_loader=vote_datasets[i],
test_loader=test_datasets[i],
criterion="sparse_categorical_accuracy",
minimise_criterion=False,
model_evaluate_kwargs={"steps": vote_batches},
))
Then we give all the models the same weights to start off with:
set_equal_weights(all_learner_models)
And then we can move on to the final stage, which is training with Collective Learning.
The function collective_learning_round
performs one round of collective learning.
One learner is selected to train and propose an update.
The other learners vote on the update, and if the vote passes then the update is accepted.
Then a new round begins.
# Train the model using Collective Learning
results = Results()
results.data.append(initial_result(all_learner_models))
for round in range(n_rounds):
results.data.append(
collective_learning_round(all_learner_models,
vote_threshold, round)
)
plot_results(results, n_learners, block=False,
score_name=all_learner_models[0].criterion)
plot_votes(results, block=False)
plot_results(results, n_learners, block=False,
score_name=all_learner_models[0].criterion)
plot_votes(results, block=True)