本文共 27691 字,大约阅读时间需要 92 分钟。
转载需注明出处:,
Lean引擎的模块划分非常的规范。其中DataFeed是数据槽,就是供应数据的模块。
模块的接口为:
namespace QuantConnect.Lean.Engine.DataFeeds{ ///IDataFeed是数据槽接口,是其他实现类必须实现的。/// Datafeed interface for creating custom datafeed sources. /// 数据供应的借口 /// public interface IDataFeed { /******************************************************** * INTERFACE PROPERTIES *********************************************************/ ////// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data. /// 订阅列表 /// ListSubscriptions { get; } /// /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime). /// 实时价格 /// ///Indexed in order of the subscriptions ListRealtimePrices { get; } /// /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out. /// 跨线程使用的队列,datafeed线程放入数据,算法主线程读出数据 /// ConcurrentQueue
>[] Bridge { get; set; } /// /// Boolean flag indicating there is no more data in any of our subscriptions. /// bool EndOfBridges { get; } ////// Array of boolean flags indicating the data status for each queue/subscription we're tracking. /// bool[] EndOfBridge { get; } ////// Set the source of the data we're requesting for the type-readers to know where to get data from. /// ///Live or Backtesting Datafeed DataFeedEndpoint DataFeed { get; set; } ////// Public flag indicator that the thread is still busy. /// 设置该线程是否活跃 /// bool IsActive { get; } ////// The most advanced moment in time for which the data feed has completed loading data /// DateTime LoadedDataFrontier { get; } ////// Data has completely loaded and we don't expect any more. /// bool LoadingComplete { get; } /******************************************************** * INTERFACE METHODS *********************************************************/ ////// Primary entry point. /// void Run(); ////// External controller calls to signal a terminate of the thread. /// void Exit(); ////// Purge all remaining data in the thread. /// void PurgeData(); }}
2. BaseDataFeed 数据槽基类
它实现IDataFeed,并且是其他派生类的一个基类。
namespace QuantConnect.Lean.Engine.DataFeeds{ ///3 FileSystemDataFeed文件系统数据槽/// Common components of a data feed allowing the extender to implement only the parts which matter. /// 数据槽的基类,允许派生类定制部分 /// public abstract class BaseDataFeed : IDataFeed { /******************************************************** * CLASS VARIABLES *********************************************************/ private IAlgorithm _algorithm; private BacktestNodePacket _job; private bool _endOfStreams = false; private int _subscriptions = 0; private int _bridgeMax = 500000; private bool _exitTriggered = false; private DateTime[] _frontierTime; /******************************************************** * CLASS PROPERTIES *********************************************************/ ////// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data. /// 订阅列表信息 /// public ListSubscriptions { get; private set; } /// /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime). /// 实时价格 /// ///Indexed in order of the subscriptions public ListRealtimePrices { get; private set; } /// /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out. /// 桥 /// public ConcurrentQueue
>[] Bridge { get; set; } /// /// Stream created from the configuration settings. /// 配置产生的流 /// public SubscriptionDataReader[] SubscriptionReaderManagers { get; set; } ////// Set the source of the data we're requesting for the type-readers to know where to get data from. /// ///Live or Backtesting Datafeed public DataFeedEndpoint DataFeed { get; set; } ////// Flag indicating the hander thread is completely finished and ready to dispose. /// public bool IsActive { get; private set; } ////// Flag indicating the file system has loaded all files. /// public bool LoadingComplete { get; private set; } ////// Furthest point in time that the data has loaded into the bridges. /// public DateTime LoadedDataFrontier { get; private set; } ////// Signifying no more data across all bridges /// public bool EndOfBridges { get { for (var i = 0; i < Bridge.Length; i++) { if (Bridge[i].Count != 0 || EndOfBridge[i] != true) { return false; } } return true; } } ////// End of Stream for Each Bridge: /// public bool[] EndOfBridge { get; set; } /******************************************************** * CLASS CONSTRUCTOR *********************************************************/ ////// Create an instance of the base datafeed. /// public BaseDataFeed(IAlgorithm algorithm, BacktestNodePacket job) { //Save the data subscriptions Subscriptions = algorithm.SubscriptionManager.Subscriptions;//是一个链表,每个节点代表了对一种证券资产数据的订阅 _subscriptions = Subscriptions.Count;//订阅了证券数目 //Public Properties: DataFeed = DataFeedEndpoint.FileSystem;//默认赋予从文件系统读取 IsActive = true;//线程是否活跃 Bridge = new ConcurrentQueue
>[_subscriptions];//桥是一个链表的链表 EndOfBridge = new bool[_subscriptions]; SubscriptionReaderManagers = new SubscriptionDataReader[_subscriptions];//初始化读者列表 RealtimePrices = new List (_subscriptions);//初始化实时价格数据列表 _frontierTime = new DateTime[_subscriptions]; //Class Privates: _job = job;//相关任务 _algorithm = algorithm;//相关算法 _endOfStreams = false; _bridgeMax = _bridgeMax / _subscriptions; //Initialize arrays: for (var i = 0; i < _subscriptions; i++) { _frontierTime[i] = job.PeriodStart; EndOfBridge[i] = false; Bridge[i] = new ConcurrentQueue
>();//分配每个订阅桥节点的数据链表 //为每个订阅分配读者 SubscriptionReaderManagers[i] = new SubscriptionDataReader(Subscriptions[i], algorithm.Securities[Subscriptions[i].Symbol], DataFeedEndpoint.Database, job.PeriodStart, job.PeriodFinish); } } /// /// Launch the primary data thread. /// 读数据的线程主函数 /// public virtual void Run() { while (!_exitTriggered && IsActive && !EndOfBridges) { for (var i = 0; i < Subscriptions.Count; i++) { //With each subscription; fetch the next increment of data from the queues: //为每一个订阅,读取下一个数据 var subscription = Subscriptions[i];//第i个证券订阅 if (Bridge[i].Count < 10000 && !EndOfBridge[i])//确定该证券读取的数据个数没有超出界限 { var data = GetData(subscription);//读取数据的函数,返回数据 //Comment out for live databases, where we should continue asking even if no data. if (data.Count == 0)//如果这个订阅没有数据,那么这个订阅就读取结束,跳到下一个订阅读取 { EndOfBridge[i] = true;//本订阅读取结束 continue; } Insert data into bridge, each list is time-grouped. Assume all different time-groups. foreach (var obj in data) { Bridge[i].Enqueue(new List() { obj }); } Record the furthest moment in time. _frontierTime[i] = data.Max(bar => bar.Time); } } //Set the most backward moment in time we've loaded LoadedDataFrontier = _frontierTime.Min(); } IsActive = false; } /// /// Get the next set of data for this subscription /// 获取该订阅的下一集合数据 /// /// ///public abstract List GetData(SubscriptionDataConfig subscription); /// /// Send an exit signal to the thread. /// public virtual void Exit() { _exitTriggered = true; PurgeData(); } ////// Loop over all the queues and clear them to fast-quit this thread and return to main. /// public virtual void PurgeData() { foreach (var t in Bridge) { t.Clear(); } } }}
namespace QuantConnect.Lean.Engine.DataFeeds{ /******************************************************** * CLASS DEFINITIONS *********************************************************/ ///4. BackTestingDataFeed 回归测试数据槽/// Historical datafeed stream reader for processing files on a local disk. /// 从本地磁盘加载历史数据 /// ///Filesystem datafeeds are incredibly fast public class FileSystemDataFeed : IDataFeed { /******************************************************** * CLASS VARIABLES *********************************************************/ // Set types in public area to speed up: private IAlgorithm _algorithm; private BacktestNodePacket _job; private bool _endOfStreams = false; private int _subscriptions = 0; private int _bridgeMax = 500000; private bool _exitTriggered = false; /******************************************************** * CLASS PROPERTIES *********************************************************/ ////// List of the subscription the algorithm has requested. Subscriptions contain the type, sourcing information and manage the enumeration of data. /// public ListSubscriptions { get; private set; } /// /// Prices of the datafeed this instant for dynamically updating security values (and calculation of the total portfolio value in realtime). /// ///Indexed in order of the subscriptions public ListRealtimePrices { get; private set; } /// /// Cross-threading queues so the datafeed pushes data into the queue and the primary algorithm thread reads it out. /// public ConcurrentQueue
>[] Bridge { get; set; } /// /// Set the source of the data we're requesting for the type-readers to know where to get data from. /// ///Live or Backtesting Datafeed public DataFeedEndpoint DataFeed { get; set; } ////// Flag indicating the hander thread is completely finished and ready to dispose. /// public bool IsActive { get; private set; } ////// Flag indicating the file system has loaded all files. /// public bool LoadingComplete { get; private set; } ////// Furthest point in time that the data has loaded into the bridges. /// public DateTime LoadedDataFrontier { get; private set; } ////// Stream created from the configuration settings. /// private SubscriptionDataReader[] SubscriptionReaders { get; set; } ////// Signifying no more data across all bridges /// public bool EndOfBridges { get { for (var i = 0; i < Bridge.Length; i++) { if (Bridge[i].Count != 0 || EndOfBridge[i] != true || _endOfStreams != true) { return false; } } return true; } } ////// End of Stream for Each Bridge: /// public bool[] EndOfBridge { get; set; } ////// Frontiers for each fill forward high water mark /// public DateTime[] FillForwardFrontiers; /******************************************************** * CLASS CONSTRUCTOR *********************************************************/ ////// Create a new backtesting data feed. /// /// Instance of the algorithm /// Algorithm work task public FileSystemDataFeed(IAlgorithm algorithm, BacktestNodePacket job) { Console.WriteLine("FileSystemDataFeed,algorithm:" + algorithm + ",job: " + job); Subscriptions = algorithm.SubscriptionManager.Subscriptions; Console.WriteLine("Subscriptions.count:" + Subscriptions.Count); _subscriptions = Subscriptions.Count; //Public Properties: DataFeed = DataFeedEndpoint.FileSystem; IsActive = true; Bridge = new ConcurrentQueue
>[_subscriptions]; EndOfBridge = new bool[_subscriptions]; SubscriptionReaders = new SubscriptionDataReader[_subscriptions]; FillForwardFrontiers = new DateTime[_subscriptions]; RealtimePrices = new List (_subscriptions); //Class Privates: _job = job; _algorithm = algorithm; _endOfStreams = false; _bridgeMax = _bridgeMax / _subscriptions; //Set the bridge maximum count: for (var i = 0; i < _subscriptions; i++) { //Create a new instance in the dictionary: Bridge[i] = new ConcurrentQueue
>(); EndOfBridge[i] = false; SubscriptionReaders[i] = new SubscriptionDataReader(Subscriptions[i], _algorithm.Securities[Subscriptions[i].Symbol], DataFeed, _job.PeriodStart, _job.PeriodFinish); FillForwardFrontiers[i] = new DateTime(); } } /******************************************************** * CLASS METHODS *********************************************************/ /// /// Main routine for datafeed analysis. /// ///This is a hot-thread and should be kept extremely lean. Modify with caution. public void Run() { Log.Trace("debug FileSystemDataFeed.run()"); Console.WriteLine("FileSystemDataFeed.run()"); //Calculate the increment based on the subscriptions: var tradeBarIncrements = CalculateIncrement(includeTick: false); var increment = CalculateIncrement(includeTick: true); //Loop over each date in the job foreach (var date in Time.EachTradeableDay(_algorithm.Securities, _job.PeriodStart, _job.PeriodFinish)) { Log.Trace("in trading date:"+date+",PeriodStart:"+_job.PeriodStart+",PeriodFinish:"+_job.PeriodFinish); //Update the source-URL from the BaseData, reset the frontier to today. Update the source URL once per day. // this is really the next frontier in the future var frontier = date.Add(increment); var activeStreams = _subscriptions; Log.Trace("subscription:" + _subscriptions); //Initialize the feeds to this date: for (var i = 0; i < _subscriptions; i++) { //Don't refresh source when we know the market is closed for this security: Log.Trace("i:"+i+"subscription"); var success = SubscriptionReaders[i].RefreshSource(date); //If we know the market is closed for security then can declare bridge closed. if (success) { EndOfBridge[i] = false; } else { ProcessMissingFileFillForward(SubscriptionReaders[i], i, tradeBarIncrements, date); EndOfBridge[i] = true; } } //Pause the DataFeed var bridgeFullCount = Bridge.Count(bridge => bridge.Count >= _bridgeMax); var bridgeZeroCount = Bridge.Count(bridge => bridge.Count == 0); var active = GetActiveStreams(); //Pause here while bridges are full, but allow missing files to pass while (bridgeFullCount > 0 && ((_subscriptions - active) == bridgeZeroCount) && !_exitTriggered) { bridgeFullCount = Bridge.Count(bridge => bridge.Count >= _bridgeMax); bridgeZeroCount = Bridge.Count(bridge => bridge.Count == 0); Thread.Sleep(5); } // for each smallest resolution var datePlusOneDay = date.Date.AddDays(1); while ((frontier.Date == date.Date || frontier.Date == datePlusOneDay) && !_exitTriggered) { var cache = new List[_subscriptions]; //Reset Loop: long earlyBirdTicks = 0; //Go over all the subscriptions, one by one add a minute of data to the bridge. //对所订阅的证券进行一个个的加载,加载到数据桥中 for (var i = 0; i < _subscriptions; i++) { //Get the reader manager:获得第i个证券的读者 var manager = SubscriptionReaders[i]; //End of the manager stream set flag to end bridge: also if the EOB flag set, from the refresh source method above if (manager.EndOfStream || EndOfBridge[i]) { EndOfBridge[i] = true; activeStreams = GetActiveStreams(); if (activeStreams == 0) { frontier = frontier.Date + TimeSpan.FromDays(1); } continue; } //Initialize data store: cache[i] = new List (2); //Add the last iteration to the new list: only if it falls into this time category //下面这个代码很关键,它把当前读到的数据条放到该证券对应的链表里面 var cacheAtIndex = cache[i]; while (manager.Current.EndTime < frontier) { Log.Trace("Current:symbol:" + manager.Current.Symbol + ",price" + manager.Current.Price); cacheAtIndex.Add(manager.Current);//放Current到该证券对应的链表里面 Log.Trace(string.Format("FileSystemDataFeed,Current: {0}", manager.Current)); if (!manager.MoveNext()) break;//读取下一个数据 } //Save the next earliest time from the bridges: only if we're not filling forward. if (manager.Current != null) { if (earlyBirdTicks == 0 || manager.Current.EndTime.Ticks < earlyBirdTicks) { earlyBirdTicks = manager.Current.EndTime.Ticks; } } } if (activeStreams == 0) { break; } //Add all the lists to the bridge, release the bridge //we push all the data up to this frontier into the bridge at once for (var i = 0; i < _subscriptions; i++) { if (cache[i] != null && cache[i].Count > 0) { FillForwardFrontiers[i] = cache[i][0].EndTime; Bridge[i].Enqueue(cache[i]); } ProcessFillForward(SubscriptionReaders[i], i, tradeBarIncrements); } //This will let consumers know we have loaded data up to this date //So that the data stream doesn't pull off data from the same time period in different events LoadedDataFrontier = frontier; if (earlyBirdTicks > 0 && earlyBirdTicks > frontier.Ticks) { //Jump increment to the nearest second, in the future: Round down, add increment frontier = (new DateTime(earlyBirdTicks)).RoundDown(increment) + increment; } else { //Otherwise step one forward. frontier += increment; } } // End of This Day. if (_exitTriggered) break; } // End of All Days: Log.Trace(DataFeed + ".Run(): Data Feed Completed."); LoadingComplete = true; //Make sure all bridges empty before declaring "end of bridge": while (!EndOfBridges && !_exitTriggered) { for (var i = 0; i < _subscriptions; i++) { //Nothing left in the bridge, mark it as finished if (Bridge[i].Count == 0) { EndOfBridge[i] = true; } } if (GetActiveStreams() == 0) _endOfStreams = true; Thread.Sleep(100); } //Close up all streams: for (var i = 0; i < Subscriptions.Count; i++) { SubscriptionReaders[i].Dispose(); } Log.Trace(DataFeed + ".Run(): Ending Thread... "); IsActive = false; } /// /// Send an exit signal to the thread. /// 退出该线程 /// public void Exit() { _exitTriggered = true; PurgeData(); } ////// Loop over all the queues and clear them to fast-quit this thread and return to main. /// 清除缓存 /// public void PurgeData() { foreach (var t in Bridge) { t.Clear(); } } private void ProcessMissingFileFillForward(SubscriptionDataReader manager, int i, TimeSpan increment, DateTime dateToFill) { // we'll copy the current into the next day var subscription = Subscriptions[i]; if (!subscription.FillDataForward || manager.Current == null) return; var start = dateToFill.Date + manager.Exchange.MarketOpen; if (subscription.ExtendedMarketHours) { start = dateToFill.Date + manager.Exchange.ExtendedMarketOpen; } // shift the 'start' time to the end of the bar by adding the increment, this makes 'date' // the end time which also allows the market open functions to behave as expected var current = manager.Current; for (var endTime = start.Add(increment); endTime.Date == dateToFill.Date; endTime = endTime + increment) { if (manager.IsMarketOpen(endTime) || (subscription.ExtendedMarketHours && manager.IsExtendedMarketOpen(endTime))) { EnqueueFillForwardData(i, current, endTime); } else { // stop fill forwarding when we're no longer open break; } } } ////// If this is a fillforward subscription, look at the previous time, and current time, and add new /// objects to queue until current time to fill up the gaps. /// /// Subscription to process /// Subscription position in the bridge ( which queue are we pushing data to ) /// Timespan increment to jump the fillforward results private void ProcessFillForward(SubscriptionDataReader manager, int i, TimeSpan increment) { // If previous == null cannot fill forward nothing there to move forward (e.g. cases where file not found on first file). if (!Subscriptions[i].FillDataForward || manager.Previous == null || manager.Current == null) return; //Last tradebar and the current one we're about to add to queue: var previous = manager.Previous; var current = manager.Current; // final two points of file that ends at midnight, causes issues in the day rollover/fill forward if (current.EndTime.TimeOfDay.Ticks == 0 && previous.EndTime == current.Time) { return; } //Initialize the frontier: if (FillForwardFrontiers[i].Ticks == 0) FillForwardFrontiers[i] = previous.EndTime; // using the previous to fill forward since 'current' is ahead the frontier var whatToFill = previous; // using current.EndTime as fill until because it's the next piece of data we have for this subscription var fillUntil = current.EndTime; //Data ended before the market closed: premature ending flag - continue filling forward until market close. if (manager.EndOfStream && manager.IsMarketOpen(current.EndTime)) { //Make sure we only fill forward to end of *today* -- don't fill forward tomorrow just because its also open fillUntil = FillForwardFrontiers[i].Date.AddDays(1); // since we ran out of data, use the current as the clone source, it's more recent than previous whatToFill = current; } // loop from our last time (previous.EndTime) to our current.EndTime, filling in all missing day during // request market hours for (var endTime = FillForwardFrontiers[i] + increment; (endTime < fillUntil); endTime = endTime + increment) { if (Subscriptions[i].ExtendedMarketHours) { if (!manager.IsExtendedMarketOpen(endTime.Subtract(increment))) { //If we've asked for extended hours, and the security is no longer inside extended market hours, skip: continue; } } else { // if the market isn't open skip to the current.EndTime and rewind until the market is open // this is the case where the previous value is from yesterday but we're trying to fill forward // the next day, so instead of zooming through 18 hours of off-market hours, skip to our current data // point and rewind the market open. // // E.g, Current.EndTime = 9:40am and Previous.EndTime = 2:00pm, so fill in from 2->4pm // and then skip to 9:40am, reverse to 9:30am and fill from 9:30->9:40 if (!manager.IsMarketOpen(endTime.Subtract(increment)) && Subscriptions[i].Resolution != Resolution.Daily) { // Move fill forward so we don't waste time in this tight loop. endTime = fillUntil; do { endTime = endTime - increment; } // is market open assumes start time of bars, open at 9:30 closed at 4:00 // so decrement our date to use the start time while (manager.IsMarketOpen(endTime.Subtract(increment))); continue; } } // add any overlap condition here if (Subscriptions[i].Resolution == Resolution.Daily) { // handle fill forward on lower resolutions var barStartTime = endTime - increment; if (manager.Exchange.IsOpenDuringBar(barStartTime, endTime, Subscriptions[i].ExtendedMarketHours)) { EnqueueFillForwardData(i, previous, endTime); } // special case catch missing days else if (endTime.TimeOfDay.Ticks == 0 && manager.Exchange.DateIsOpen(endTime.Date.AddDays(-1))) { EnqueueFillForwardData(i, previous, endTime); } continue; } EnqueueFillForwardData(i, whatToFill, endTime); } } private void EnqueueFillForwardData(int i, BaseData previous, DateTime dataEndTime) { var cache = new List(1); var fillforward = previous.Clone(true); fillforward.Time = dataEndTime.Subtract(Subscriptions[i].Increment); fillforward.EndTime = dataEndTime; FillForwardFrontiers[i] = dataEndTime; cache.Add(fillforward); Bridge[i].Enqueue(cache); } /// /// Get the number of active streams still EndOfBridge array. /// ///Count of the number of streams with data private int GetActiveStreams() { //Get the number of active streams: var activeStreams = (from stream in EndOfBridge where stream == false select stream).Count(); return activeStreams; } ////// Calculate the minimum increment to scan for data based on the data requested. /// /// When true the subscriptions include a tick data source, meaning there is almost no increment. ///Timespan to jump the data source so it efficiently orders the results private TimeSpan CalculateIncrement(bool includeTick) { var increment = TimeSpan.FromDays(1); foreach (var config in Subscriptions) { switch (config.Resolution) { //Hourly TradeBars: case Resolution.Hour: if (increment > TimeSpan.FromHours(1)) { increment = TimeSpan.FromHours(1); } break; //Minutely TradeBars: case Resolution.Minute: if (increment > TimeSpan.FromMinutes(1)) { increment = TimeSpan.FromMinutes(1); } break; //Secondly Bars: case Resolution.Second: if (increment > TimeSpan.FromSeconds(1)) { increment = TimeSpan.FromSeconds(1); } break; //Ticks: No increment; just fire each data piece in as they happen. case Resolution.Tick: if (increment > TimeSpan.FromMilliseconds(1) && includeTick) { increment = new TimeSpan(0, 0, 0, 0, 1); } break; } } return increment; } } // End FileSystem Local Feed Class:} // End Namespace
namespace QuantConnect.Lean.Engine.DataFeeds{ /******************************************************** * CLASS DEFINITIONS *********************************************************/ ///此外还有数据库数据槽DataBaseDataFeed和LiveTradingDataFeed实时交易数据槽。在这里就不在说明。/// Backtesting data feed extends the filesystem data feed with almost no modifications. Later this method can /// be used for implementing alternative sources/generation for backtesting data. /// 回归测试数据槽是文件系统数据槽的派生类 /// public class BacktestingDataFeed : FileSystemDataFeed { /******************************************************** * CLASS VARIABLES *********************************************************/ /******************************************************** * CLASS PROPERTIES *********************************************************/ /******************************************************** * CLASS CONSTRUCTOR *********************************************************/ ////// Pass through the backtesting datafeed to the underlying file system datafeed implementation. /// /// Algorithm we're operating with /// Algorithm worker job public BacktestingDataFeed(IAlgorithm algorithm, BacktestNodePacket job) : base(algorithm, job) { DataFeed = DataFeedEndpoint.Backtesting; } } // End Backtesting Feed Class:} // End Namespace