Communication

The communication module is use to establish p2p communications for transferring information. The p2p_com module can be used to securely chat or transferring files during a period of time. The Federated_hook module is used for p2p communication in a federated learning environment.

p2p_com

This module is for p2p communication between two nodes. The communication is done via sockets, for now the messages are not encrypted. For external networks you may need to open ports.

Assume you have two nodes. The ip:port of node1 is 123.456.789:5555 and while the ip:port of node2 is 987.654.321:5555. To stat a p2p communication do the following:

On node1 run

>>> receiver = Receiver('123.456.789', '4444')
>>> sender = Sender('987.654.321', '5555')
>>> treads = [receiver.start(), sender.start()]
On node2 run

>>> receiver = Receiver('987.654.321', '5555')
>>> sender = Sender('123.456.789', '4444')
>>> treads = [receiver.start(), sender.start()]

This code was taken from https://www.webcodegeeks.com/python/python-network-programming-tutorial/

class layers.communication.p2p_com.Receiver(my_host, my_port)

This class will receive messages

Parameters
  • my_host (str) – My local ip address

  • my_port (str) – My local port

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class layers.communication.p2p_com.Sender(my_friends_host, my_friends_port)

This class is for p2p communication between two nodes. The communication is done via sockets, for now the messages are not encrypted. This class will receive messages

Parameters
  • my_friends_host (str) – Ip address of the node you want to send a message

  • my_friends_port (str) – Port of the node you want to send a message

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

layers.communication.p2p_com.main()

Main can be use to test the p2p service. Open two consoles try it.

Federated_hook

We use the bellow code for data transactions of large variables in the BSMD. In particular we use the socket implementation of coMind for transferring weights and we add a second layer to record all transactions in the BSMD This code was taken from comind.org.

Copyright 2018 coMind. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class layers.communication.federated_hook._FederatedHook(is_chief, name, private_ip, public_ip, private_key, list_of_workers, domain, ip, wait_time=30, interval_steps=100)

Provides a hook to implement federated averaging with tensorflow.

In a typical synchronous training environment, gradients will be averaged each step and then applied to the variables in one shot, after which replicas can fetch the new variables and continue. In a federated average training environment, model variables will be averaged every ‘interval_steps’ steps, and then the replicas will fetch the new variables and continue training locally. In the interval between two average operations, there is no data transfer, which can accelerate training.

The hook has two different ways of working depending if it is the chief worker or not.

The chief starts creating a socket that will act as server. Then it stays waiting _wait_time seconds and accepting connections of all those workers that want to join the training, and distributes a task index to each of them. This task index is not always necessary. In our demos we use it to tell each worker which part of the data-set it has to use for the training and it could have other applications.

Remember if you training is not going to be performed in a LAN you will need to do some port forwarding, the authors recommend you to have a look to this article they wrote about it.

Once the training is going to start sends it’s weights to the other workers, so that they all start with the same initial ones. After each batch is trained, it checks if _interval_steps has been completed, and if so, it gathers the weights of all the workers and its own, averages them and sends the average to all those workers.

Workers open a socket connection with the chief and wait to get their worker number. Once the training is going to start they wait for the chief to send them its weights. After each training round they check if _interval_steps has been completed, and if so, they send their weights to the chief and wait for it’s response, the averaged weights with which they will continue training.

Parameters
  • is_chief (bool) – whether it is going to act as chief or not.

  • name (str) – name of the node in the BSMD

  • private_ip (str) – complete local ip in which the chief is going to serve its socket. Example: 172.134.65.123:7777

  • public_ip (str) – ip to which the workers are going to connect.

  • private_key (str) – private key of the node for signing the transactions

  • list_of_workers (list[str]) – list of all the nodes that are willing to participate. In theory the chief node knows the list as he creates the domain and accounts for the participants

  • domain (str) – name of the domain

  • ip (str) – ip address for connecting to the BSMD

  • wait_time (int,optional) – how long the chief should wait at the beginning for the workers to connect.

  • interval_steps (int,optional) – number of steps between two “average op”, which specifies how frequent a model synchronization is performed

_assign_vars(local_vars)

Utility to refresh local variables.

Parameters

local_vars – List of local variables

Returns

The ops to assign value of global vars to local vars.

_create_placeholders()

Creates the placeholders that we will use to inject the weights into the graph

_get_np_array(connection_socket)

Routine to receive a list of numpy arrays.

Parameters

connection_socket – a socket with a connection already established.

_get_task_index()

Chief distributes task index number to workers that connect to it and lets them know how many workers are there in total.

Returns

task index corresponding to this worker and the total workers.

static _receiving_subroutine(connection_socket)

Subroutine inside _get_np_array to receive a list of numpy arrays. If the sending was not correctly received it sends back an error message to the sender in order to try it again.

Parameters

connection_socket – a socket with a connection already established.

Returns

static _send_np_array(arrays_to_send, connection_socket, iteration, tot_workers, sender, private_key, receiver, domain, ip, list_participants=None)

Send weights to nodes via a socket. Also write the transaction in the BSMD

Parameters
  • arrays_to_send – weight to be send

  • connection_socket

  • iteration (int) – iteration number in the federated process

  • tot_workers (int) – total number of node in the federated process

  • sender (str) – name of the node sending the information

  • private_key (str) – private key of the node sending the transaction

  • receiver (str) – name of the receiver

  • optional list_participants (array,) – list of participants in the federated process. This variable is just need in the first loop

_start_socket_server()

Creates a socket with ssl protection that will act as server.

Returns

ssl secured socket that will act as server.

_start_socket_worker()

Creates a socket with ssl protection that will act as client.

Returns

ssl secured socket that will work as client.

after_create_session(session, coord)
If chief:

Once the training is going to start sends it’s weights to the other workers, so that they all start with the same initial ones. Once it has send the weights to all the workers it sends them a signal to start training.

Workers:

Wait for the chief to send them its weights and inject them into the graph.

Parameters
  • session

  • coord

after_run(run_context, run_values)

Both chief and workers, check if they should average their weights in

this round. Is this is the case:

If chief:

Tries to gather the weights of all the workers, but ignores those that lost connection at some point. It averages them and then send them back to the workers. Finally in injects the averaged weights to its own graph.

Workers:

Send their weights to the chief. Wait for the chief to send them the averaged weights and inject them into their graph.

before_run(run_context)

Session before_run

begin()

Session begin

end(session)

Session end