/* This software was written by Dirk Engling <erdgeist@erdgeist.org>
   It is considered beerware. Prost. Skol. Cheers or whatever.

   $Id$ */

/* System */
#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <pthread.h>
#include <pwd.h>
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>

/* Libowfat */
#include "byte.h"
#include "io.h"
#include "iob.h"
#include "ip6.h"
#include "ndelay.h"
#include "scan.h"
#include "socket.h"

/* Opentracker */
#include "ot_mutex.h"
#include "ot_stats.h"
#include "ot_vector.h"
#include "trackerlogic.h"

#ifndef WANT_SYNC_LIVE
#define WANT_SYNC_LIVE
#endif
#include "ot_livesync.h"

ot_ip6   g_serverip;
uint16_t g_serverport = 9009;
uint32_t g_tracker_id;
char     groupip_1[4] = {224, 0, 23, 5};
int      g_self_pipe[2];

/* If you have more than 10 peers, don't use this proxy
   Use 20 slots for 10 peers to have room for 10 incoming connection slots
 */
#define MAX_PEERS                         20

#define LIVESYNC_INCOMING_BUFFSIZE        (256 * 256)
#define STREAMSYNC_OUTGOING_BUFFSIZE      (256 * 256)

#define LIVESYNC_OUTGOING_BUFFSIZE_PEERS  1480
#define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash))
#define LIVESYNC_MAXDELAY                 15 /* seconds */

/* The amount of time a complete sync cycle should take */
#define OT_SYNC_INTERVAL_MINUTES          2

/* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */
#define OT_SYNC_SLEEP                     (((OT_SYNC_INTERVAL_MINUTES) * 60 * 1000000) / (OT_BUCKET_COUNT))

enum { OT_SYNC_PEER4, OT_SYNC_PEER6 };
enum { FLAG_SERVERSOCKET = 1 };

/* For incoming packets */
static int64    g_socket_in = -1;
static uint8_t  g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE];

/* For outgoing packets */
static int64    g_socket_out = -1;
static uint8_t  g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS];
static uint8_t *g_peerbuffer_pos;
static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS;
static ot_time  g_next_packet_time;

static void *livesync_worker(void *args);
static void *streamsync_worker(void *args);
static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer);

void exerr(char *message) {
  fprintf(stderr, "%s\n", message);
  exit(111);
}

void stats_issue_event(ot_status_event event, PROTO_FLAG proto, uintptr_t event_data) {
  (void)event;
  (void)proto;
  (void)event_data;
}

void livesync_bind_mcast(ot_ip6 ip, uint16_t port) {
  char  tmpip[4] = {0, 0, 0, 0};
  char *v4ip;

  if (!ip6_isv4mapped(ip))
    exerr("v6 mcast support not yet available.");
  v4ip = ip + 12;

  if (g_socket_in != -1)
    exerr("Error: Livesync listen ip specified twice.");

  if ((g_socket_in = socket_udp4()) < 0)
    exerr("Error: Cant create live sync incoming socket.");
  ndelay_off(g_socket_in);

  if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1)
    exerr("Error: Cant bind live sync incoming socket.");

  if (socket_mcjoin4(g_socket_in, groupip_1, v4ip))
    exerr("Error: Cant make live sync incoming socket join mcast group.");

  if ((g_socket_out = socket_udp4()) < 0)
    exerr("Error: Cant create live sync outgoing socket.");
  if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1)
    exerr("Error: Cant bind live sync outgoing socket.");

  socket_mcttl4(g_socket_out, 1);
  socket_mcloop4(g_socket_out, 1);
}

size_t add_peer_to_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) {
  int          exactmatch;
  ot_torrent  *torrent;
  ot_peerlist *peer_list;
  ot_peer     *peer_dest;
  ot_vector   *torrents_list = mutex_bucket_lock_by_hash(hash);
  size_t       compare_size  = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size);

  torrent                    = vector_find_or_insert(torrents_list, (void *)hash, sizeof(ot_torrent), compare_size, &exactmatch);
  if (!torrent)
    return -1;

  if (!exactmatch) {
    /* Create a new torrent entry, then */
    memcpy(torrent->hash, hash, sizeof(ot_hash));

    if (!(torrent->peer_list6 = malloc(sizeof(ot_peerlist))) || !(torrent->peer_list4 = malloc(sizeof(ot_peerlist)))) {
      vector_remove_torrent(torrents_list, torrent);
      mutex_bucket_unlock_by_hash(hash, 0);
      return -1;
    }

    byte_zero(torrent->peer_list6, sizeof(ot_peerlist));
    byte_zero(torrent->peer_list4, sizeof(ot_peerlist));
  }

  peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;

  /* Check for peer in torrent */
  peer_dest = vector_find_or_insert_peer(&(peer_list->peers), peer, peer_size, &exactmatch);
  if (!peer_dest) {
    mutex_bucket_unlock_by_hash(hash, 0);
    return -1;
  }
  /* Tell peer that it's fresh */
  OT_PEERTIME(peer, peer_size) = 0;

  /* If we hadn't had a match create peer there */
  if (!exactmatch) {
    peer_list->peer_count++;
    if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING)
      peer_list->seed_count++;
  }
  memcpy(peer_dest, peer, peer_size);
  mutex_bucket_unlock_by_hash(hash, 0);
  return 0;
}

size_t remove_peer_from_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) {
  int         exactmatch;
  ot_vector  *torrents_list = mutex_bucket_lock_by_hash(hash);
  ot_torrent *torrent       = binary_search(hash, torrents_list->data, torrents_list->size, sizeof(ot_torrent), OT_HASH_COMPARE_SIZE, &exactmatch);

  if (exactmatch) {
    ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4;
    switch (vector_remove_peer(&peer_list->peers, peer, peer_size)) {
    case 2:
      peer_list->seed_count--; /* Intentional fallthrough */
    case 1:
      peer_list->peer_count--; /* Intentional fallthrough */
    default:
      break;
    }
  }

  mutex_bucket_unlock_by_hash(hash, 0);
  return 0;
}

void free_peerlist(ot_peerlist *peer_list) {
  if (peer_list->peers.data) {
    if (OT_PEERLIST_HASBUCKETS(peer_list)) {
      ot_vector *bucket_list = (ot_vector *)(peer_list->peers.data);

      while (peer_list->peers.size--)
        free(bucket_list++->data);
    }
    free(peer_list->peers.data);
  }
  free(peer_list);
}

static void livesync_handle_peersync(ssize_t datalen, size_t peer_size) {
  int off = sizeof(g_tracker_id) + sizeof(uint32_t);

  fprintf(stderr, ".");

  while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= datalen) {
    ot_peer *peer = (ot_peer *)(g_inbuffer + off + sizeof(ot_hash));
    ot_hash *hash = (ot_hash *)(g_inbuffer + off);

    if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED)
      remove_peer_from_torrent_proxy(*hash, peer, peer_size);
    else
      add_peer_to_torrent_proxy(*hash, peer, peer_size);

    off += sizeof(ot_hash) + peer_size;
  }
}

int usage(char *self) {
  fprintf(stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self);
  return 0;
}

enum {
  FLAG_OUTGOING      = 0x80,

  FLAG_DISCONNECTED  = 0x00,
  FLAG_CONNECTING    = 0x01,
  FLAG_WAITTRACKERID = 0x02,
  FLAG_CONNECTED     = 0x03,

  FLAG_MASK          = 0x07
};

#define PROXYPEER_NEEDSCONNECT(flag)     ((flag) == FLAG_OUTGOING)
#define PROXYPEER_ISCONNECTED(flag)      (((flag) & FLAG_MASK) == FLAG_CONNECTED)
#define PROXYPEER_SETDISCONNECTED(flag)  (flag) = (((flag) & FLAG_OUTGOING) | FLAG_DISCONNECTED)
#define PROXYPEER_SETCONNECTING(flag)    (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTING)
#define PROXYPEER_SETWAITTRACKERID(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_WAITTRACKERID)
#define PROXYPEER_SETCONNECTED(flag)     (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTED)

typedef struct {
  int      state;             /* Whether we want to connect, how far our handshake is, etc. */
  ot_ip6   ip;                /* The peer to connect to */
  uint16_t port;              /* The peers port */
  uint8_t  indata[8192 * 16]; /* Any data not processed yet */
  size_t   indata_length;     /* Length of unprocessed data */
  uint32_t tracker_id;        /* How the other end greeted */
  int64    fd;                /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */
  io_batch outdata;           /* The iobatch containing our sync data */

  size_t   packet_tcount;     /* Number of unprocessed torrents in packet we currently receive */
  uint8_t  packet_tprefix;    /* Prefix byte for all torrents in current packet */
  uint8_t  packet_type;       /* Type of current packet */
  uint32_t packet_tid;        /* Tracker id for current packet */

} proxy_peer;
static void process_indata(proxy_peer *peer);

void reset_info_block(proxy_peer *peer) {
  peer->indata_length = 0;
  peer->tracker_id    = 0;
  peer->fd            = -1;
  peer->packet_tcount = 0;
  iob_reset(&peer->outdata);
  PROXYPEER_SETDISCONNECTED(peer->state);
}

/* Number of connections to peers
 * If a peer's IP is set, we try to reconnect, when the connection drops
 * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it
 * Multiple connections to/from the same ip are okay, if tracker_id doesn't match
 * Reconnect attempts occur only twice a minute
 */
static int        g_connection_count;
static ot_time    g_connection_reconn;
static proxy_peer g_connections[MAX_PEERS];

static void handle_reconnects(void) {
  int i;
  for (i = 0; i < g_connection_count; ++i)
    if (PROXYPEER_NEEDSCONNECT(g_connections[i].state)) {
      int64 newfd = socket_tcp6();
      fprintf(stderr, "(Re)connecting to peer...");
      if (newfd < 0)
        continue; /* No socket for you */
      io_fd(newfd);
      if (socket_bind6_reuse(newfd, g_serverip, g_serverport, 0)) {
        io_close(newfd);
        continue;
      }
      if (socket_connect6(newfd, g_connections[i].ip, g_connections[i].port, 0) == -1 && errno != EINPROGRESS && errno != EWOULDBLOCK) {
        close(newfd);
        continue;
      }
      io_wantwrite(newfd); /* So we will be informed when it is connected */
      io_setcookie(newfd, g_connections + i);

      /* Prepare connection info block */
      reset_info_block(g_connections + i);
      g_connections[i].fd = newfd;
      PROXYPEER_SETCONNECTING(g_connections[i].state);
    }
  g_connection_reconn = time(NULL) + 30;
}

/* Handle incoming connection requests, check against whitelist */
static void handle_accept(int64 serversocket) {
  int64  newfd;
  ot_ip6 ip;
  uint16 port;

  while ((newfd = socket_accept6(serversocket, ip, &port, NULL)) != -1) {

    /* XXX some access control */

    /* Put fd into a non-blocking mode */
    io_nonblock(newfd);

    if (!io_fd(newfd))
      io_close(newfd);
    else {
      /* Find a new home for our incoming connection */
      int i;
      for (i = 0; i < MAX_PEERS; ++i)
        if (g_connections[i].state == FLAG_DISCONNECTED)
          break;
      if (i == MAX_PEERS) {
        fprintf(stderr, "No room for incoming connection.");
        close(newfd);
        continue;
      }

      /* Prepare connection info block */
      reset_info_block(g_connections + i);
      PROXYPEER_SETCONNECTING(g_connections[i].state);
      g_connections[i].port = port;
      g_connections[i].fd   = newfd;

      io_setcookie(newfd, g_connections + i);

      /* We expect the connecting side to begin with its tracker_id */
      io_wantread(newfd);
    }
  }

  return;
}

/* New sync data on the stream */
static void handle_read(int64 peersocket) {
  int         i;
  int64       datalen;
  uint32_t    tracker_id;
  proxy_peer *peer = io_getcookie(peersocket);

  if (!peer) {
    /* Can't happen ;) */
    io_close(peersocket);
    return;
  }
  switch (peer->state & FLAG_MASK) {
  case FLAG_DISCONNECTED:
    io_close(peersocket);
    break; /* Shouldnt happen */
  case FLAG_CONNECTING:
  case FLAG_WAITTRACKERID:
    /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now)
       This also catches 0 bytes reads == EOF and negative values, denoting connection errors */
    if (io_tryread(peersocket, (void *)&tracker_id, sizeof(tracker_id)) != sizeof(tracker_id))
      goto close_socket;

    /* See, if we already have a connection to that peer */
    for (i = 0; i < MAX_PEERS; ++i)
      if ((g_connections[i].state & FLAG_MASK) == FLAG_CONNECTED && g_connections[i].tracker_id == tracker_id) {
        fprintf(stderr, "Peer already connected. Closing connection.\n");
        goto close_socket;
      }

    /* Also no need for soliloquy */
    if (tracker_id == g_tracker_id)
      goto close_socket;

    /* The new connection is good, send our tracker_id on incoming connections */
    if (peer->state == FLAG_CONNECTING)
      if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) != sizeof(g_tracker_id))
        goto close_socket;

    peer->tracker_id = tracker_id;
    PROXYPEER_SETCONNECTED(peer->state);

    if (peer->state & FLAG_OUTGOING)
      fprintf(stderr, "succeeded.\n");
    else
      fprintf(stderr, "Incoming connection successful.\n");

    break;
  close_socket:
    fprintf(stderr, "Handshake incomplete, closing socket\n");
    io_close(peersocket);
    reset_info_block(peer);
    break;
  case FLAG_CONNECTED:
    /* Here we acutally expect data from peer
       indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */
    datalen = io_tryread(peersocket, (void *)(peer->indata + peer->indata_length), sizeof(peer->indata) - peer->indata_length);
    if (!datalen || datalen < -1) {
      fprintf(stderr, "Connection closed by remote peer.\n");
      io_close(peersocket);
      reset_info_block(peer);
    } else if (datalen > 0) {
      peer->indata_length += datalen;
      process_indata(peer);
    }
    break;
  }
}

/* Can write new sync data to the stream */
static void handle_write(int64 peersocket) {
  proxy_peer *peer = io_getcookie(peersocket);

  if (!peer) {
    /* Can't happen ;) */
    io_close(peersocket);
    return;
  }

  switch (peer->state & FLAG_MASK) {
  case FLAG_DISCONNECTED:
  default: /* Should not happen */
    io_close(peersocket);
    break;
  case FLAG_CONNECTING:
    /* Ensure that the connection is established and handle connection error */
    if (peer->state & FLAG_OUTGOING && !socket_connected(peersocket)) {
      fprintf(stderr, "failed\n");
      reset_info_block(peer);
      io_close(peersocket);
      break;
    }

    if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) == sizeof(g_tracker_id)) {
      PROXYPEER_SETWAITTRACKERID(peer->state);
      io_dontwantwrite(peersocket);
      io_wantread(peersocket);
    } else {
      fprintf(stderr, "Handshake incomplete, closing socket\n");
      io_close(peersocket);
      reset_info_block(peer);
    }
    break;
  case FLAG_CONNECTED:
    switch (iob_send(peersocket, &peer->outdata)) {
    case 0: /* all data sent */
      io_dontwantwrite(peersocket);
      break;
    case -3: /* an error occured */
      io_close(peersocket);
      reset_info_block(peer);
      break;
    default: /* Normal operation or eagain */
      break;
    }
    break;
  }

  return;
}

static void server_mainloop() {
  int64 sock;

  /* inlined livesync_init() */
  memset(g_peerbuffer_start, 0, sizeof(g_peerbuffer_start));
  g_peerbuffer_pos = g_peerbuffer_start;
  memcpy(g_peerbuffer_pos, &g_tracker_id, sizeof(g_tracker_id));
  uint32_pack_big((char *)g_peerbuffer_pos + sizeof(g_tracker_id), OT_SYNC_PEER);
  g_peerbuffer_pos   += sizeof(g_tracker_id) + sizeof(uint32_t);
  g_next_packet_time  = time(NULL) + LIVESYNC_MAXDELAY;

  while (1) {
    /* See if we need to connect to anyone */
    if (time(NULL) > g_connection_reconn)
      handle_reconnects();

    /* Wait for io events until next approx reconn check time */
    io_waituntil2(30 * 1000);

    /* Loop over readable sockets */
    while ((sock = io_canread()) != -1) {
      const void *cookie = io_getcookie(sock);
      if ((uintptr_t)cookie == FLAG_SERVERSOCKET)
        handle_accept(sock);
      else
        handle_read(sock);
    }

    /* Loop over writable sockets */
    while ((sock = io_canwrite()) != -1)
      handle_write(sock);

    livesync_ticker();
  }
}

static void panic(const char *routine) {
  fprintf(stderr, "%s: %s\n", routine, strerror(errno));
  exit(111);
}

static int64_t ot_try_bind(ot_ip6 ip, uint16_t port) {
  int64 sock = socket_tcp6();

  if (socket_bind6_reuse(sock, ip, port, 0) == -1)
    panic("socket_bind6_reuse");

  if (socket_listen(sock, SOMAXCONN) == -1)
    panic("socket_listen");

  if (!io_fd(sock))
    panic("io_fd");

  io_setcookie(sock, (void *)FLAG_SERVERSOCKET);
  io_wantread(sock);
  return sock;
}

static int scan_ip6_port(const char *src, ot_ip6 ip, uint16 *port) {
  const char *s = src;
  int         off, bracket = 0;
  while (isspace(*s))
    ++s;
  if (*s == '[')
    ++s, ++bracket; /* for v6 style notation */
  if (!(off = scan_ip6(s, ip)))
    return 0;
  s += off;
  if (*s == 0 || isspace(*s))
    return s - src;
  if (*s == ']' && bracket)
    ++s;
  if (!ip6_isv4mapped(ip)) {
    if ((bracket && *(s) != ':') || (*(s) != '.'))
      return 0;
    s++;
  } else {
    if (*(s++) != ':')
      return 0;
  }
  if (!(off = scan_ushort(s, port)))
    return 0;
  return off + s - src;
}

int main(int argc, char **argv) {
  static pthread_t sync_in_thread_id;
  static pthread_t sync_out_thread_id;
  ot_ip6           serverip;
  uint16_t         tmpport;
  int              scanon = 1, lbound = 0, sbound = 0;

  srandom(time(NULL));
#ifdef WANT_ARC4RANDOM
  g_tracker_id = arc4random();
#else
  g_tracker_id = random();
#endif

  while (scanon) {
    switch (getopt(argc, argv, ":l:c:L:h")) {
    case -1:
      scanon = 0;
      break;
    case 'l':
      tmpport = 0;
      if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
        usage(argv[0]);
        exit(1);
      }
      ot_try_bind(serverip, tmpport);
      ++sbound;
      break;
    case 'c':
      if (g_connection_count > MAX_PEERS / 2)
        exerr("Connection limit exceeded.\n");
      tmpport = 0;
      if (!scan_ip6_port(optarg, g_connections[g_connection_count].ip, &g_connections[g_connection_count].port) || !g_connections[g_connection_count].port) {
        usage(argv[0]);
        exit(1);
      }
      g_connections[g_connection_count++].state = FLAG_OUTGOING;
      break;
    case 'L':
      tmpport = 9696;
      if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) {
        usage(argv[0]);
        exit(1);
      }
      livesync_bind_mcast(serverip, tmpport);
      ++lbound;
      break;
    default:
    case '?':
      usage(argv[0]);
      exit(1);
    }
  }

  if (!lbound)
    exerr("No livesync port bound.");
  if (!g_connection_count && !sbound)
    exerr("No streamsync port bound.");
  pthread_create(&sync_in_thread_id, NULL, livesync_worker, NULL);
  pthread_create(&sync_out_thread_id, NULL, streamsync_worker, NULL);

  server_mainloop();
  return 0;
}

static void *streamsync_worker(void *args) {
  (void)args;
  while (1) {
    int bucket;
    /* For each bucket... */
    for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) {
      /* Get exclusive access to that bucket */
      ot_vector *torrents_list = mutex_bucket_lock(bucket);
      size_t     tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0;
      size_t     mem, mem_a = 0, mem_b = 0;
      uint8_t   *ptr = 0, *ptr_a, *ptr_b, *ptr_c;

      if (!torrents_list->size)
        goto unlock_continue;

      /* For each torrent in this bucket.. */
      for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) {
        /* Address torrents members */
        ot_peerlist *peer_list = (((ot_torrent *)(torrents_list->data))[tor_offset]).peer_list;
        switch (peer_list->peer_count) {
        case 2:
          count_two++;
          break;
        case 1:
          count_one++;
          break;
        case 0:
          break;
        default:
          count_def++;
          count_peers += peer_list->peer_count;
        }
      }

      /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */
      mem = 3 * (1 + 1 + 2) + (count_one + count_two) * (19 + 1) + count_def * (19 + 8) + (count_one + 2 * count_two + count_peers) * 7;

      fprintf(stderr, "Mem: %zd\n", mem);

      ptr = ptr_a = ptr_b = ptr_c = malloc(mem);
      if (!ptr)
        goto unlock_continue;

      if (count_one > 4 || !count_def) {
        mem_a     = 1 + 1 + 2 + count_one * (19 + 7);
        ptr_b    += mem_a;
        ptr_c    += mem_a;
        ptr_a[0]  = 1;                                     /* Offset 0: packet type 1 */
        ptr_a[1]  = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
        ptr_a[2]  = count_one >> 8;
        ptr_a[3]  = count_one & 255;
        ptr_a    += 4;
      } else
        count_def += count_one;

      if (count_two > 4 || !count_def) {
        mem_b     = 1 + 1 + 2 + count_two * (19 + 14);
        ptr_c    += mem_b;
        ptr_b[0]  = 2;                                     /* Offset 0: packet type 2 */
        ptr_b[1]  = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
        ptr_b[2]  = count_two >> 8;
        ptr_b[3]  = count_two & 255;
        ptr_b    += 4;
      } else
        count_def += count_two;

      if (count_def) {
        ptr_c[0]  = 0;                                     /* Offset 0: packet type 0 */
        ptr_c[1]  = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */
        ptr_c[2]  = count_def >> 8;
        ptr_c[3]  = count_def & 255;
        ptr_c    += 4;
      }

      /* For each torrent in this bucket.. */
      for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) {
        /* Address torrents members */
        ot_torrent  *torrent   = ((ot_torrent *)(torrents_list->data)) + tor_offset;
        ot_peerlist *peer_list = torrent->peer_list;
        ot_peer     *peers     = (ot_peer *)(peer_list->peers.data);
        uint8_t    **dst;

        /* Determine destination slot */
        count_peers = peer_list->peer_count;
        switch (count_peers) {
        case 0:
          continue;
        case 1:
          dst = mem_a ? &ptr_a : &ptr_c;
          break;
        case 2:
          dst = mem_b ? &ptr_b : &ptr_c;
          break;
        default:
          dst = &ptr_c;
          break;
        }

        /* Copy tail of info_hash, advance pointer */
        memcpy(*dst, ((uint8_t *)torrent->hash) + 1, sizeof(ot_hash) - 1);
        *dst += sizeof(ot_hash) - 1;

        /* Encode peer count */
        if (dst == &ptr_c)
          while (count_peers) {
            if (count_peers <= 0x7f)
              *(*dst)++ = count_peers;
            else
              *(*dst)++ = 0x80 | (count_peers & 0x7f);
            count_peers >>= 7;
          }

        /* Copy peers */
        count_peers = peer_list->peer_count;
        while (count_peers--) {
          memcpy(*dst, peers++, OT_IP_SIZE + 3);
          *dst += OT_IP_SIZE + 3;
        }
        free_peerlist(peer_list);
      }

      free(torrents_list->data);
      memset(torrents_list, 0, sizeof(*torrents_list));
    unlock_continue:
      mutex_bucket_unlock(bucket, 0);

      if (ptr) {
        int i;

        if (ptr_b > ptr_c)
          ptr_c = ptr_b;
        if (ptr_a > ptr_c)
          ptr_c = ptr_a;
        mem = ptr_c - ptr;

        for (i = 0; i < MAX_PEERS; ++i) {
          if (PROXYPEER_ISCONNECTED(g_connections[i].state)) {
            void *tmp = malloc(mem);
            if (tmp) {
              memcpy(tmp, ptr, mem);
              iob_addbuf_free(&g_connections[i].outdata, tmp, mem);
              io_wantwrite(g_connections[i].fd);
            }
          }
        }

        free(ptr);
      }
      usleep(OT_SYNC_SLEEP);
    }
  }
  return 0;
}

static void livesync_issue_peersync() {
  socket_send4(g_socket_out, (char *)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, groupip_1, LIVESYNC_PORT);
  g_peerbuffer_pos   = g_peerbuffer_start + sizeof(g_tracker_id) + sizeof(uint32_t);
  g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY;
}

void livesync_ticker() {
  /* livesync_issue_peersync sets g_next_packet_time */
  if (time(NULL) > g_next_packet_time && g_peerbuffer_pos > g_peerbuffer_start + sizeof(g_tracker_id))
    livesync_issue_peersync();
}

static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer) {
  //  unsigned int i;

  *g_peerbuffer_pos = prefix;
  memcpy(g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1);
  memcpy(g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1);

#if 0
  /* Dump info_hash */
  for( i=0; i<sizeof(ot_hash); ++i )
    printf( "%02X", g_peerbuffer_pos[i] );
  putchar( ':' );
#endif
  g_peerbuffer_pos += sizeof(ot_hash);
#if 0
  printf( "%hhu.%hhu.%hhu.%hhu:%hu (%02X %02X)\n", g_peerbuffer_pos[0], g_peerbuffer_pos[1], g_peerbuffer_pos[2], g_peerbuffer_pos[3],
    g_peerbuffer_pos[4] | ( g_peerbuffer_pos[5] << 8 ), g_peerbuffer_pos[6], g_peerbuffer_pos[7] );
#endif
  g_peerbuffer_pos += sizeof(ot_peer);

  if (g_peerbuffer_pos >= g_peerbuffer_highwater)
    livesync_issue_peersync();
}

static void process_indata(proxy_peer *peer) {
  size_t   consumed, peers;
  uint8_t *data    = peer->indata, *hash;
  uint8_t *dataend = data + peer->indata_length;

  while (1) {
    /* If we're not inside of a packet, make a new one */
    if (!peer->packet_tcount) {
      /* Ensure the header is complete or postpone processing */
      if (data + 4 > dataend)
        break;
      peer->packet_type     = data[0];
      peer->packet_tprefix  = data[1];
      peer->packet_tcount   = data[2] * 256 + data[3];
      data                 += 4;
      printf("type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount);
    }

    /* Ensure size for a minimal torrent block */
    if (data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend)
      break;

    /* Advance pointer to peer count or peers */
    hash   = data;
    data  += sizeof(ot_hash) - 1;

    /* Type 0 has peer count encoded before each peers */
    peers  = peer->packet_type;
    if (!peers) {
      int shift = 0;
      do
        peers |= (0x7f & *data) << (7 * shift);
      while (*(data++) & 0x80 && shift++ < 6);
    }
#if 0
printf( "peers: %zd\n", peers );
#endif
    /* Ensure enough data being read to hold all peers */
    if (data + (OT_IP_SIZE + 3) * peers > dataend) {
      data = hash;
      break;
    }
    while (peers--) {
      livesync_proxytell(peer->packet_tprefix, hash, data);
      data += OT_IP_SIZE + 3;
    }
    --peer->packet_tcount;
  }

  consumed = data - peer->indata;
  memmove(peer->indata, data, peer->indata_length - consumed);
  peer->indata_length -= consumed;
}

static void *livesync_worker(void *args) {
  (void)args;
  while (1) {
    ot_ip6   in_ip;
    uint16_t in_port;
    size_t   datalen = socket_recv4(g_socket_in, (char *)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port);

    /* Expect at least tracker id and packet type */
    if (datalen <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t)))
      continue;
    if (!memcmp(g_inbuffer, &g_tracker_id, sizeof(g_tracker_id))) {
      /* drop packet coming from ourselves */
      continue;
    }
    switch (uint32_read_big((char *)g_inbuffer + sizeof(g_tracker_id))) {
    case OT_SYNC_PEER4:
      livesync_handle_peersync(datalen, OT_PEER_SIZE4);
      break;
    case OT_SYNC_PEER6:
      livesync_handle_peersync(datalen, OT_PEER_SIZE6);
      break;
    default:
      // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) );
      break;
    }
  }
  return 0;
}