on exchange trading engine

分两部分, ordering 和 sequencing+matching. 后续的部分也很复杂/重要, 就是clearing+settlement, 本文不包括

Ordering

系统实际实现了多种OrderType以及flag, 目前开放给用户的有三种 (以下说明均以):

LIMIT, MARKET, STOP_LIMIT

MARKET - 市价单

LIMIT - 限价单

STOP_LIMIT - 当triggerOn价格或更好达到时 (对于BUY来说是打到或更高), 转为限价单

Decimal places

Sequencing + Matching

OrderService, 在几个地方有用到, jar来说打在api里面.

下单唯一入口API: /orders

/**
  * Create a new order.
  *
  * @param orderBean order info.
  * @return Order object.
  * @throws InterruptedException
  */
@PostMapping("/orders")
@MetricWith("api_create_order")
public ResponseWrapper createOrder(@RequestBody OrderBean orderBean) {
    // can trade?
    if (!AuthContext.getCurrentAuth().canTrade) {
        return ResponseWrapper.ok(ResponseWrapper.STATUS_FAIL, ApiError.USER_CANNOT_TRADE);
        //throw new ApiException(ApiError.USER_CANNOT_TRADE);
    }
    // validate:
    try {
        orderBean.validate(this.orderFeaturesConfiguration);
    } catch (ApiException e) {
        return ResponseWrapper.ok(ResponseWrapper.STATUS_FAIL, e);
    }
    final String traceId = AuthContext.getTraceId();
    // pre-process:
    preOrderProcessService.preProcess(traceId, AuthContext.getCurrentUser().id, orderBean.type, orderBean.symbol);

    logger.info("[{}] try create order: {}", traceId, orderBean);
    User user = AuthContext.getRequiredCurrentUser();
    ValidatedOrderBean vbean = null;

    switch (orderBean.type) {
        case BUY_LIMIT:
            vbean = new ValidatedOrderBean();
            vbean.copyPropertiesFrom(orderBean);
            vbean.features = orderBean.toFeatures();
            break;
        case SELL_LIMIT:
            vbean = new ValidatedOrderBean();
            vbean.copyPropertiesFrom(orderBean);
            vbean.features = orderBean.toFeatures();
            break;
        case BUY_MARKET:
            vbean = new ValidatedOrderBean();
            vbean.copyPropertiesFrom(orderBean);
            vbean.features = orderBean.toFeatures();
            break;
        case SELL_MARKET:
            vbean = new ValidatedOrderBean();
            vbean.copyPropertiesFrom(orderBean);
            vbean.features = orderBean.toFeatures();
            break;
        case CANCEL_BUY:
        case CANCEL_SELL:
        default:
            return ResponseWrapper.ok(ResponseWrapper.STATUS_FAIL, "Unsupported order type.");
        //throw new ApiException(ApiError.PARAMETER_INVALID, "orderType", "Unsupported order type.");
    }
    Order order = orderService.createOrder(traceId, user.id, vbean);
    if (StringUtils.isBlank(namespace)) {
        this.namespace = NameAddrUtils.parseInstanceIdFromEndpoint(rocketMQTemplate.getProducer().getNamesrvAddr());
    }
    // 错误次数大于3次直接跳过MQ发送,类似熔断机制
    if (errcount.intValue() < 3) {
        try {
            rocketMQTemplate.send(NamespaceUtil.wrapNamespace(namespace, Messaging.Queue.MESSAGE_TO_EXCHANGE + ":order_form"),
                    MessageBuilder.withPayload(JSONObject.toJSON(order)).build());
        } catch (Exception e) {
            e.printStackTrace();
            errcount.incrementAndGet();
            logger.info("current error {}", errcount);
            messageList.add(order);
            isSend = false;
            // 触发心跳机制检测
            if (!heartbeat) {
                detectionRocketMQ();
                heartbeat = true;
            }
        }
    } else {
        errcount.incrementAndGet();
        logger.info("current error {}", errcount);
        messageList.add(order);
    }
    return ResponseWrapper.ok(ResponseWrapper.STATUS_SUCCESS, order);
    //return orderService.createOrder(traceId, user.id, vbean);
}

真正做事在orderService.createOrder(traceId, user.id, vbean), doSendMessage也在其中 进一步delegate到OrderHandler

public Order createBuyLimitOrder(String traceId, long userId, ValidatedOrderBean bean) {
  long ts = System.currentTimeMillis();
  Symbol symbol = Symbol.valueOf(bean.symbol);
  FeeRates feeRates = getFeeRates(traceId, ts, userId, bean);
  // frozen:
  BigDecimal freezeAmount;
  if (this.alwaysChargeQuote) {
    freezeAmount = bean.price.multiply(bean.amount).multiply(BigDecimal.ONE.add(feeRates.taker));
  } else {
    freezeAmount = bean.price.multiply(bean.amount);
  }
  if (accountService.getSpotAccount(userId, symbol.quote, AccountType.SPOT_AVAILABLE).balance
      .compareTo(freezeAmount) < 0) {
    throw new ApiException(ApiError.ACCOUNT_FREEZE_FAILED);
  }
  // create order:
  Order order = prepareOrder(userId, OrderType.BUY_LIMIT, bean, feeRates, this.alwaysChargeQuote, ts);
  db.save(order);
  accountService.freeze(traceId, FlowType.TRADE_FREEZE, "CREATE_ORDER", order.id, userId, symbol.quote.getName(),
      freezeAmount);
  return order;
}

上面每一行都很关键, 创建order, persist, freeze amount

Order prepareOrder(long userId, OrderType type, String source, String symbol, BigDecimal price, BigDecimal amount,
    BigDecimal triggerOn, String mining, int features, FeeRates feeRates, boolean chargeQuote, long ts) {
  Order order = new Order();
  order.userId = userId;
  order.source = source;
  order.symbol = symbol;
  order.sequenceIndex = Symbol.valueOf(symbol).sequenceIndex;
  order.type = type;
  order.price = price;
  order.amount = amount;
  order.features = features;
  order.filledAmount = BigDecimal.ZERO;
  order.fee = BigDecimal.ZERO;
  order.triggerOn = triggerOn == null ? BigDecimal.ZERO : triggerOn;
  order.takerFeeRate = feeRates.taker;
  order.makerFeeRate = feeRates.maker;
  order.chargeQuote = chargeQuote;
  // 此时是SUBMITTED状态
  order.status = OrderStatus.SUBMITTED;
  order.createdAt = order.updatedAt = ts;
  return order;
}

发送的是sequence message, payload只带小量信息:

void doSendMessage(String traceId, Order order) {
		sequenceMessageProducer.sendMessage(Messaging.Queue.ORDER_TO_SEQUENCE,
				new SequenceMessage(traceId, order.sequenceIndex, order.symbol, order.id));
		logger.info("[{}] sent order message {} to sequence queue ok.", traceId, order.id);
	}
Thread createJob() {
  Thread thread = new Thread(() -> {
    logger.info("[SequenceGroup] start sequence job...");

    this.serviceMap = new HashMap<>(SequenceUtil.SEQUENCE_SLOTS);

    // init 32 sequence services:
    for (int i = 0; i < SequenceUtil.SEQUENCE_SLOTS; i++) {
      SequenceService sequenceService = new SequenceService(super.db, i, this.sequenceHandler,
          this.orderMessageProducer, this);
      // IMPORTANT: resent recent sequenced messages in case of these messages are
      // sequenced, but sent to MQ failed:
      List<? extends AbstractOrderSequence> sequences = sequenceService.getRecentOrderSequences();
      List<SequenceMessage> msgs = sequences.stream().map(seq -> db.get(Order.class, seq.orderId))
          .map(order -> new SequenceMessage("R_ORDER_" + order.id, order.sequenceIndex, order.symbol,
              order.id))
          .collect(Collectors.toList());
      logger.info("[SequenceGroup] re-send recent {} sequenced orders...", msgs.size());
      sequenceMessageProducer.sendMessages(Messaging.Queue.ORDER_TO_SEQUENCE, msgs);

      this.serviceMap.put(i, sequenceService);
    }

    // find max sequence id:
    final long globalMax = this.serviceMap.values().stream().mapToLong(s -> s.findLastSequenceId()).max()
        .orElse(0);
    this.lastScannedOrderId = this.serviceMap.values().stream().mapToLong(s -> s.findLastOrderId()).max()
        .orElse(0);

    // IMPORTANT: there is only ONE global sequence AtomicLong:
    logger.info("[SequenceGroup] set global sequence id to {}.", globalMax);
    this.sequence = new AtomicLong(globalMax);

    // start services:
    for (SequenceService service : this.serviceMap.values()) {
      service.startJob(sequence);
    }

    // init consumer:
    logger.info("[SequenceGroup] create message consumer for sequence...");
    sequenceMessageConsumer = messagingFactory
        .createMessageConsumerBuilder(Messaging.Queue.ORDER_TO_SEQUENCE, SequenceMessage.class)
        .withConsumeConcurrent(1) // <- MUST be 1 for sequence is strictly ordered
        .withConsumeBatchSize(BATCH_MESSAGES) // batch process
        .withBatchMessageListener(this) // listener
        .withUniqueGroupName("Sequence") // group name = "Sequence_<ip:port>"
        .build();

    // resent submitted orders:
    this.lastScannedOrderId = Math.max(0, this.lastScannedOrderId - 100000);
    reSequenceSubmittedOrders();

    while (running) {
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        break;
      }
    }
    // close message consumer:
    logger.info("[SequenceGroup] close message consumer for sequence...");
    sequenceMessageConsumer.close();
    System.exit(1);
  });
  return thread;
}

reSequenceSubmittedOrders()负责把之前SUBMITTED, 落库, 但是没有定序没有创建. 是重新构造sequenceMessage并发送. 这个上限发100_000. 每60秒做一遍. sequenceService.getRecentOrderSequences()是已经定序, 已经创建sequenceMessage而且落库了的. 但是发送MQ失败了, 重新发送, 这个只发200

try {
  List<SequenceMessage> msgs = new ArrayList<>();
  if (this.queue.size() > 100) {
    logger.warn("[sequence{}] cached queue size too long: {}", this.index, this.queue.size());
  }
  // poll first 50 messages immediately:
  for (int i = 0; i < 50; i++) {
    SequenceMessage msg = this.queue.poll();
    if (msg != null) {
      msgs.add(msg);
    }
  }
  // poll and wait at most 50 ms:
  SequenceMessage msg = this.queue.poll(50, TimeUnit.MILLISECONDS);
  if (msg != null) {
    msgs.add(msg);
  }
  if (msgs.isEmpty()) {
    return;
  }
  logger.info("[sequence{}] do batch sequence for {} sequence messages... ", this.index, msgs.size());
  List<Order> orders = sequenceHandler.doSequenceOrders(this.orderSequenceClass, this.sharedSequence,
      this.symbolLastSequenceIdHolders, msgs);
  // send messages:
  List<OrderMessage> messages = orders.stream()
      .map(order -> OrderMessageUtil.createOrderMessage("ORDER_" + order.id, order))
      .collect(Collectors.toList());
  // if message sent failed, the sequence should crash and restart:
  orderMessageProducer.sendMessages(Messaging.Queue.SEQUENCE_TO_MATCH, messages);
} catch (InterruptedException e) {
  logger.warn("[sequence{}] sequence thread was interrupted.", this.index);
} catch (Exception e) {
  logger.error("[sequence" + this.index + "] exception when do sequence", e);
  // crash?
  shutdown();
  check.setCrashed();
  System.exit(100 + this.index);
  throw new RuntimeException(e);
}

去重不够强? doSequenceOrders if (set.contains(orderId)) { continue; }

Batch? AbstractOrderSequence seq = db.from(orderSequenceClass).where("orderId = ?", orderId).first();

相对时间不重要, 绝对顺序重要

final boolean isStop = msg.triggerOn.signum() > 0;
if (isStop && this.marketPrice.signum() > 0) {
    final boolean isTrailing = OrderFeature.hasFeature(msg.features, OrderFeature.TRAILING_STOP);
    switch (msg.type) {
        case BUY_LIMIT:
        case BUY_MARKET:
            if (isTrailing) {
                // put it into stop-buy-book and trailing list:
                StopOrderItem item = new StopOrderItem(msg, this.marketPrice);
                this.stopBuyBook.add(item);
                this.trailingBuyList.addItem(item);
                return; // DO NOT continue process
            }
            // for buy: market price < trigger price:
            if (this.marketPrice.compareTo(msg.triggerOn) < 0) {
                // put it into stop-buy-book:
                StopOrderItem item = new StopOrderItem(msg, this.marketPrice);
                this.stopBuyBook.add(item);
                return; // DO NOT continue process
            }
            break;

        case SELL_LIMIT:
        case SELL_MARKET:
            if (isTrailing) {
                // put it into stop-sell-book and trailing list:
                StopOrderItem item = new StopOrderItem(msg, this.marketPrice);
                this.stopSellBook.add(item);
                this.trailingSellList.addItem(item);
                return; // DO NOT continue process
            }
            // for sell: market price > trigger price:
            if (this.marketPrice.compareTo(msg.triggerOn) > 0) {
                // put it into stop-sell-book:
                StopOrderItem item = new StopOrderItem(msg, this.marketPrice);
                this.stopSellBook.add(item);
                return; // DO NOT continue process
            }
            break;

        case CANCEL_BUY:
            // check if in trailing stop list:
            TrailingStopItem buyTrailingItem = this.trailingBuyList.removeItem(msg.refSeqId);
            if (buyTrailingItem != null) {
                // removed from trailing stop list, also remove from stop order book:
                this.stopBuyBook.remove(buyTrailingItem.stopOrderItem.seqId);
                // cancel it:
                doCancelStopOrder(traceId, msg, buyTrailingItem.stopOrderItem);
                return;
            }
            // check if in stop order book:
            StopOrderItem buyStopItem = this.stopBuyBook.remove(msg.refSeqId);
            if (buyStopItem != null) {
                // cancel it:
                doCancelStopOrder(traceId, msg, buyStopItem);
                return;
            }
            break;

        case CANCEL_SELL:
            // check if in trailing stop list:
            TrailingStopItem sellTrailingItem = this.trailingSellList.removeItem(msg.refSeqId);
            if (sellTrailingItem != null) {
                // removed from trailing stop list, also remove from stop order book:
                this.stopSellBook.remove(sellTrailingItem.stopOrderItem.seqId);
                // cancel it:
                doCancelStopOrder(traceId, msg, sellTrailingItem.stopOrderItem);
                return;
            }
            // check if in stop order book:
            StopOrderItem sellStopItem = this.stopSellBook.remove(msg.refSeqId);
            if (sellStopItem != null) {
                // cancel it:
                doCancelStopOrder(traceId, msg, sellStopItem);
                return;
            }
            break;

        default:
            throw new IllegalArgumentException("Invalid message type for stop order: " + msg.type);
    }
}

// process as non-stop order:
final BigDecimal previousPrice = this.marketPrice;
processNonStopOrder(traceId, msg);
// must trace market price after process order:
final int cmp = this.marketPrice.compareTo(previousPrice);
if (cmp > 0) {
    // price up:
    logger.debug("[{}] check stop-buy-book...", traceId);
    for (; ; ) {
        StopOrderItem first = this.stopBuyBook.getFirst();
        if (first != null && this.marketPrice.compareTo(first.triggerOn) >= 0) {
            logger.info("[{}] trigger stop buy: {}...", traceId, first);
            this.stopBuyBook.remove(first.seqId);
            if (first.isTrailing) {
                this.trailingBuyList.removeItem(first.seqId);
            }
            processNonStopOrder(traceId, first.orderMessage);
        } else {
            // IMPORTANT: break loop if no stop-buy order or market price < triggerOn
            break;
        }
    }
    // update trailing sell book boundary:
    List<TrailingStopItem> updates = this.trailingSellList.removeToBeUpdatedItems(this.marketPrice);
    logger.debug("update {} trailing stop items...", updates.size());
    for (TrailingStopItem item : updates) {
        // step 1: remove from StopOrderBook:
        this.stopSellBook.remove(item.stopOrderItem.seqId);
        // step 2: reset trailing price:
        item.stopOrderItem.resetTrailing(this.marketPrice);
        // step 3: re-put into stop-order-book and trailing-list:
        this.stopSellBook.add(item.stopOrderItem);
        this.trailingSellList.addItem(item.stopOrderItem);
    }
} else if (cmp < 0) {
    // price down:
    logger.debug("[{}] check stop-sell-book...", traceId);
    for (; ; ) {
        StopOrderItem first = this.stopSellBook.getFirst();
        if (first != null && this.marketPrice.compareTo(first.triggerOn) <= 0) {
            logger.info("[{}] trigger stop sell: {}...", traceId, first);
            this.stopSellBook.remove(first.seqId);
            if (first.isTrailing) {
                this.trailingSellList.removeItem(first.seqId);
            }
            processNonStopOrder(traceId, first.orderMessage);
        } else {
            // IMPORTANT: break loop if no stop-sell order or market price > triggerOn
            break;
        }
    }
    // update trailing buy book boundary:
    List<TrailingStopItem> updates = this.trailingBuyList.removeToBeUpdatedItems(this.marketPrice);
    logger.debug("update {} trailing stop items...", updates.size());
    for (TrailingStopItem item : updates) {
        // step 1: remove from StopOrderBook:
        this.stopBuyBook.remove(item.stopOrderItem.seqId);
        // step 2: reset trailing price:
        item.stopOrderItem.resetTrailing(this.marketPrice);
        // step 3: re-put into stop-order-book and trailing-list:
        this.stopBuyBook.add(item.stopOrderItem);
        this.trailingBuyList.addItem(item.stopOrderItem);
    }
}
Written on June 3, 2019