Example of a PyTorch Custom Layer

When I create neural software systems, I most often use the PyTorch library. The Keras library is very good for basic neural systems but for advanced architectures I like the flexibility of PyTorch. Using raw TensorFlow without Keras is an option, but I am more comfortable using the PyTorch APIs.


An example of a custom NoisyLinear() layer. Notice the two outputs are slightly different.

I hadn’t looked at the problem of creating a custom PyTorch Layer in several months, so I figured I’d code up a demo. The most fundamental layer is Linear(). For a 4-7-3 neural network (four input nodes, one hidden layer with seven nodes, three output nodes), a definition could look like:

import torch as T

class Net(T.nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.hid1 = T.nn.Linear(4, 7)  # 4-7-3
    self.oupt = T.nn.Linear(7, 3)  # default init

  def forward(self, x):
    z = T.tanh(self.hid1(x))
    z = self.oupt(z)
    return z

For my demo, I decided to create a custom NoisyLinear() layer that works just like a standard Linear() layer but injects randomness. This isn’t particularly useful by itself but I’m just experimenting. So I wanted a 4-7-3 network to work like this:

class Net(T.nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.hid1 = NoisyLinear(4, 7)  # 4-7-3
    self.oupt = NoisyLinear(7, 3)

  def forward(self, x):
    z = T.tanh(self.hid1(x))
    z = self.oupt(z) 
    return z

In other words, everything is the same except I use the program defined NoisyLinear() instead of the built-in torch.nn.Linear() layer. The custom layer definition I came up with is:

class NoisyLinear(T.nn.Module):
  def __init__(self, n_in, n_out):
    super().__init__()
    self.n_in, self.n_out = n_in, n_out

    self.weights = T.nn.Parameter(T.zeros((n_out, n_in),
      dtype=T.float32))
    self.bias = T.nn.Parameter(T.tensor(n_out,
      dtype=T.float32))
    self.lo = 0.90; self.hi = 0.98  # noise

    lim = 0.01  # initialize weights and bias
    T.nn.init.uniform_(self.weights, -lim, +lim)
    T.nn.init.uniform_(self.bias, -lim, +lim)

  def forward(self, x):
    wx= T.mm(x, self.weights.t())
    rnd = (self.hi - self.lo) * T.rand(1) + self.lo
    return rnd * T.add(wx, self.bias)  # wts * x + bias

The Parameter() class makes the weights and the bias trainable. I used basic uniform initialization with hard-coded range [-0.01, +0.01]. The forward() method computes weights * inputs + bias as usual, but then multiples the results by random noise in the range [0.90, 0.98]. Each time the forward() method of a NoisyLayer() layer instance is called, the result will be slightly different.

Writing a custom layer for PyTorch is rarely needed, but compared to alternative libraries, customizing PyTorch is relatively easier — with an emphasis on “relatively”.



Three well-known custom cars. Left: Dodge Deodora (1965). Center: Norman Timbs Special (1947). Right: Chrysler Thunderbolt (1941).


Complete demo code below. Long.

# iris_noisy_layer.py
# creating a custom "NoisyLinear" layer
# PyTorch 1.9.0-CPU Anaconda3-2020.02  Python 3.7.6
# Windows 10 

import numpy as np
import torch as T

device = T.device("cpu")  # to Tensor or Module

# -----------------------------------------------------------

class NoisyLinear(T.nn.Module):
  def __init__(self, n_in, n_out):
    super().__init__()
    self.n_in, self.n_out = n_in, n_out

    self.weights = T.nn.Parameter(T.zeros((n_out, n_in),
      dtype=T.float32))
    self.bias = T.nn.Parameter(T.tensor(n_out,
      dtype=T.float32))
    self.lo = 0.90; self.hi = 0.98  # noise

    lim = 0.01  # initialize weights and bias
    T.nn.init.uniform_(self.weights, -lim, +lim)
    T.nn.init.uniform_(self.bias, -lim, +lim)

  def forward(self, x):
    wx= T.mm(x, self.weights.t())
    rnd = (self.hi - self.lo) * T.rand(1) + self.lo
    return rnd * T.add(wx, self.bias)  # wts * x + bias

# -----------------------------------------------------------

class IrisDataset(T.utils.data.Dataset):
  def __init__(self, src_file, num_rows=None):
    # 5.0, 3.5, 1.3, 0.3, 0
    tmp_x = np.loadtxt(src_file, max_rows=num_rows,
      usecols=range(0,4), delimiter=",", skiprows=0,
      dtype=np.float32)
    tmp_y = np.loadtxt(src_file, max_rows=num_rows,
      usecols=4, delimiter=",", skiprows=0,
      dtype=np.int64)

    self.x_data = T.tensor(tmp_x, dtype=T.float32)
    self.y_data = T.tensor(tmp_y, dtype=T.int64)

  def __len__(self):
    return len(self.x_data)

  def __getitem__(self, idx):
    if T.is_tensor(idx):
      idx = idx.tolist()
    preds = self.x_data[idx]
    spcs = self.y_data[idx] 
    sample = { 'predictors' : preds, 'species' : spcs }
    return sample

# -----------------------------------------------------------

class Net(T.nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.hid1 = NoisyLinear(4, 7)  # 4-7-3
    self.oupt = NoisyLinear(7, 3)

  def forward(self, x):
    z = T.tanh(self.hid1(x))
    z = self.oupt(z)  # no softmax: CrossEntropyLoss() 
    return z

# -----------------------------------------------------------

def accuracy(model, dataset):
  # assumes model.eval()
  dataldr = T.utils.data.DataLoader(dataset, batch_size=1,
    shuffle=False)
  n_correct = 0; n_wrong = 0
  for (_, batch) in enumerate(dataldr):
    X = batch['predictors'] 
    # Y = T.flatten(batch['species'])
    Y = batch['species']  # already flattened by Dataset
    with T.no_grad():
      oupt = model(X)  # logits form

    big_idx = T.argmax(oupt)
    # if big_idx.item() == Y.item():
    if big_idx == Y:
      n_correct += 1
    else:
      n_wrong += 1

  acc = (n_correct * 1.0) / (n_correct + n_wrong)
  return acc

# -----------------------------------------------------------

def main():
  # 0. get started
  print("\nBegin Iris custom NoisyLinear layer demo \n")
  T.manual_seed(1)
  np.random.seed(1)
  
  # 1. create Dataset and DataLoader objects
  print("Creating Iris train DataLoader ")

  train_file = ".\\Data\\iris_train.txt"
  train_ds = IrisDataset(train_file, num_rows=120)

  bat_size = 4
  train_ldr = T.utils.data.DataLoader(train_ds,
    batch_size=bat_size, shuffle=True)

  # 2. create network
  net = Net().to(device)

  # 3. train model
  max_epochs = 20
  ep_log_interval = 4
  lrn_rate = 0.05

  loss_func = T.nn.CrossEntropyLoss()  # applies softmax()
  optimizer = T.optim.SGD(net.parameters(), lr=lrn_rate)

  print("\nbat_size = %3d " % bat_size)
  print("loss = " + str(loss_func))
  print("optimizer = SGD")
  print("max_epochs = %3d " % max_epochs)
  print("lrn_rate = %0.3f " % lrn_rate)

  print("\nStarting training")
  net.train()
  for epoch in range(0, max_epochs):
    epoch_loss = 0  # for one full epoch
    num_lines_read = 0

    for (batch_idx, batch) in enumerate(train_ldr):
      X = batch['predictors']  # [10,4]
      Y = batch['species']  # OK; alreay flattened

      optimizer.zero_grad()
      oupt = net(X)
      loss_val = loss_func(oupt, Y)  # a tensor
      epoch_loss += loss_val.item()  # accumulate
      loss_val.backward()            # gradients
      optimizer.step()               # update wts

    if epoch % ep_log_interval == 0:
      print("epoch = %4d   loss = %0.4f" % (epoch, epoch_loss))
  print("Done ")

  # 4. evaluate model accuracy
  print("\nComputing model accuracy")
  net.eval()
  acc = accuracy(net, train_ds)  # item-by-item
  print("Accuracy on train data = %0.4f" % acc)

  # 5. make a prediction
  print("\nPredicting species for [6.1, 3.1, 5.1, 1.1]: ")
  x = np.array([[6.1, 3.1, 5.1, 1.1]], dtype=np.float32)
  x = T.tensor(x, dtype=T.float32).to(device) 

  with T.no_grad():
    logits = net(x).to(device)  # values do not sum to 1.0
  probs = T.softmax(logits, dim=1).to(device)
  T.set_printoptions(precision=4)
  print(probs)

  print("\nPredicting again for [6.1, 3.1, 5.1, 1.1]: ")
  x = np.array([[6.1, 3.1, 5.1, 1.1]], dtype=np.float32)
  x = T.tensor(x, dtype=T.float32).to(device) 

  with T.no_grad():
    logits = net(x).to(device)  # values do not sum to 1.0
  probs = T.softmax(logits, dim=1).to(device)
  T.set_printoptions(precision=4)
  print(probs)

  print("\nEnd custom NoisyLinear layer demo")

if __name__ == "__main__":
  main()
This entry was posted in PyTorch. Bookmark the permalink.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s