All posts All posts by this author Paper color Change page color Announcements

Making use of hardware keys of Zyxel NSA320 Network Attached Storage in Debian Linux

If you are an owner of Zyxel NSA320 and didn't try to install custom linux OS in it, you are doing it wrong! With serial headers sticking out of the motherboard in prominent fashion, this network attached storage was built to be hacked in any way you want. I wont go in discussions about how to install your distro of choice in it, as it's just a google search away. For installing Debian Linux, try compiled versions from forum.doozan.com.

One subtle problem with the Debian images from forum.doozan.com is that the hardware keys of the device does not work. So I made a small python script to make the POWER key and the COPY key somewhat functional. If you install the script, pressing the POWER key for 4 seconds and then releasing it will restart the NAS. Pressing the COPY key will copy all files from the first VFAT/NTFS partition to some specified location in the filesystem. If the COPY button seems irresponsive, hold it for a second or two. For extra coolness, the COPY LED turns red during copy operation! =)

You can find the script here. The installation instructions are given in the comments/docstrings. Don't forget to change the configuration variables to suit your need.

Here's the source code for convenience -

#!/usr/bin/env python
"""This script Monitors COPY and POWER keys of Network Attached Storage
Zyxel NSA320 and takes appropriate actions when key events are detected.
This is an effort to make the hardware keys in the NSA320 useful with
Debian Linux OS.

Usage: hwkeys.py [-d]

       As the script is running, pressing COPY button(you might have
       to hold it for around 2 seconds) will copy all the data from
       first vfat/ntfs partition to a directory configured in the script.

       Pressing and holding POWER button for more than 4 seconds will
       cause the system to restart.


Options:
  -d / --daemonize : Keep the script running in the background.


Requirements:
  - NSA320 Network attached storage with Debian Linux OS.
    The script was tested with compiled version of Debian
    Wheezy from: http://forum.doozan.com/list.php?2

  - python2.7
  - python-evdev [ pip install evdev ]
  - python-daemon [ pip install python-daemon]

  - System commands: blkid, mkdir, mountpoint, mount, sync
    Most of them are installed in the system by default.


Installation:
  - Install the requirements.

  - Change COPY_MOUNT_DIR and COPY_DESTINATION_DIR variables
    in the script. Make sure the directories exist.

  - Test the script by running in normal mode:
      python2.7 hwkeys.py

  - Install the script in the '/etc/rc.local' by adding this line:
      python2.7 hwkeys.py -d


TODO:
  - In daemon mode, the log generated from this script should go to syslog.


Author: Titon Barua <titon@vimmaniac.com>

Copyright: 2014 Vimmaniac Pvt. Ltd., Bangladesh.

License: This software is distributed under FreeBSD License.
         Please see http://www.freebsd.org/copyright/freebsd-license.html
         for full license text.
"""

import sys
import time
import shutil
from glob import glob
from datetime import datetime
from os.path import join as pjoin
from asyncore import file_dispatcher, loop
from subprocess import check_call, check_output, CalledProcessError

from daemon import DaemonContext
from evdev import InputDevice, ecodes

USAGE = """\
Usage: hwkeys.py [-d]

Description: Monitors two hardware keys(COPY and POWER) of NSA320 for
             activities and takes actions when appropriate events are detected.

Options: -d / --daemonize : Keep the script running in the background.
"""

# Configuration variables.
#--------------------------------------------------------------------\
ENABLE_RESTART_WITH_POWER_BUTTON = True
ENABLE_QUICK_COPY_WITH_COPY_BUTTON = True

# Power button must be hold, then released for this number of seconds
# for restart operation to be activated.
POWER_BUTTON_HOLD_PERIOD = 4

# Directory where the attached VFAT/NTFS partitions are mounted.
COPY_MOUNT_DIR = "/media/for_copying"

# Directory where copied data is dumped.
COPY_DESTINATION_DIR = "/media/storehouse2/quick_copies"

# Filesystems to lookout for. This is the silliest way to detect
# typical removable storage.
COPY_TARGET_FILESYSTEMS = ('vfat', 'ntfs')
#---------------------------------------------------------------------/
# End of configuration variables.



INPUT_DEVICE = "/dev/input/event0"

KEY_PRESSED = 1
KEY_RELEASED = 0



def switch_led(name, color, state):
  """A simple wrapper to control leds in NSA320."""
  ledname = '/sys/class/leds/nsa320:{0}:{1}/brightness'.format(color, name)
  with open(ledname, 'w') as f:
    f.write('1' if state else '0')



def execute_restart_operation():
  if not ENABLE_RESTART_WITH_POWER_BUTTON: return
  print("Restarting the system ...")
  check_call(['shutdown', '-r', 'now'])



def execute_copy_operation():
  if not ENABLE_QUICK_COPY_WITH_COPY_BUTTON: return
  print("Trying to copy from attached removable drive ...")

  def extract_blkid_info(blkid_output):
    """Create a dictionary from 'blkid -o udev' output."""
    return list(map(
      lambda fields: dict([f.strip().split("=") for f in fields]),
      map(
        lambda infoblock: infoblock.strip().split("\n"),
        blkid_output.strip().split("\n\n"))))

  # Select the first partition attached to the system with appropriate
  # filesystem(default: vfat or ntfs) as our source partition. The
  # 'first' here is arbitrary and does not guarantee any order.
  #-------------------------------------------------------------------\
  partinfo = extract_blkid_info(check_output(['blkid', '-o', 'udev']))
  try:
    partition = [
      x for x in partinfo
      if x['ID_FS_TYPE'] in COPY_TARGET_FILESYSTEMS][0]
  except IndexError:
    print("No partition suitable for copy operation was found.")
    return
  #-------------------------------------------------------------------/

  uuid = partition['ID_FS_UUID']
  label = partition['ID_FS_LABEL']

  print(
    "Copying from partition with uuid='{0}' and label='{1}' ..."
    .format(uuid, label))

  device = "/dev/disk/by-uuid/{0}".format(uuid)
  mountpoint = pjoin(COPY_MOUNT_DIR, uuid)

  # Create mount-point if it does not exist already.
  try: check_call(['mkdir', '-p', mountpoint])
  except CalledProcessError:
    print("ERROR: Failed to create mount directory!")
    return

  # Unmount mount-point if it is mounted already.
  #------------------------------------------------------------------\
  try: 
    check_call(['mountpoint', '-q', mountpoint])

    print("WARNING: Mountpoint already in use! Trying to unmount ...")
    try: check_call(['umount', mountpoint])
    except CalledProcessError:
      print("ERROR: Failed to unmount already mounted mount point!")
      return
  except CalledProcessError:
    pass
  #------------------------------------------------------------------/

  print("Mounting disk with uuid={0} ...".format(uuid))
  try: check_call(['mount', device, mountpoint])
  except CalledProcessError:
    print("ERROR: Failed to mount device!")
    return

  currtime = datetime.utcnow().replace(microsecond=0)
  currtime = currtime.isoformat('_')

  dest = pjoin(
    COPY_DESTINATION_DIR, "{0}__{1}__{2}".format(uuid, label, currtime))

  print("Copying from {0} to {1} ...".format(mountpoint, dest))

  # Start the actual copy operation.
  switch_led('copy', 'red', True)
  shutil.copytree(mountpoint, dest)

  # Sync the filesystem.
  switch_led('copy', 'green', True) 
  check_call(['sync'])

  # Show two seconds of green to indicate success.
  switch_led('copy', 'red', False)
  print("Copy operation was successful.")
  time.sleep(2.0)
  switch_led('copy', 'green', False)



class InputDeviceDispatcher(file_dispatcher):
  def __init__(self, device):
    self.device = device
    file_dispatcher.__init__(self, device)

    self.power_button_press_time = None
    self.copy_operation_end_time = None


  def recv(self, ign=None):
    return self.device.read()


  def handle_read(self):
    for event in self.recv():
      if event.code == ecodes.KEY_COPY:
        # If a copy keypress is detected to be originated at the time
        # while another copy operation was running, it is ignored.
        if self.copy_operation_end_time is not None:
          if event.timestamp() < self.copy_operation_end_time:
            print("WARNING: Redundant button press detected!")
            return

        if event.value == KEY_PRESSED:
          execute_copy_operation()
          self.copy_operation_end_time = time.time()

      elif event.code == ecodes.KEY_POWER:
        if event.value == KEY_PRESSED:
          self.power_button_press_time = event.timestamp()

        elif event.value == KEY_RELEASED:
          try:
            if self.power_button_press_time is not None:
              intv = event.timestamp() - self.power_button_press_time
              if intv > POWER_BUTTON_HOLD_PERIOD:
                execute_restart_operation()
          finally:
            self.power_button_press_time = None



def listen_for_hwkey_events():
  dev = InputDevice(INPUT_DEVICE)
  InputDeviceDispatcher(dev)
  loop()



if __name__ == "__main__":
  if (len(sys.argv) >= 2):
    if (sys.argv[1] in ('-d', '--daemonize')):
      with DaemonContext():
        listen_for_hwkey_events()
    else:
      print(USAGE)
      sys.exit(1)
  else:
    listen_for_hwkey_events()
Software Solutions