C\C++学习 公积金相关帖子汇总 魅族M8SDK学习系列不断更新中 Mini开发小组软件发布

[Delphi] delphi线程池【转】

[ 1545 查看 / 0 回复 ]

实现代码
  1. unit uThreadPool;

  2. {  aPool.AddRequest(TMyRequest.Create(RequestParam1, RequestParam2, ...)); }

  3. interface
  4. uses
  5.   Windows,
  6.   Classes;

  7. // 是否记录日志
  8. // {$DEFINE NOLOGS}

  9. type
  10.   TCriticalSection = class(TObject)
  11.   protected
  12.     FSection: TRTLCriticalSection;
  13.   public
  14.     constructor Create;
  15.     destructor Destroy; override;
  16.     // 进入临界区
  17.     procedure Enter;
  18.     // 离开临界区
  19.     procedure Leave;
  20.     // 尝试进入
  21.     function TryEnter: Boolean;
  22.   end;

  23. type
  24.   // 储存请求数据的基本类
  25.   TWorkItem = class(TObject)
  26.   public
  27.     // 是否有重复任务
  28.     function IsTheSame(DataObj: TWorkItem): Boolean; virtual;
  29.     // 如果 NOLOGS 被定义,则禁用。
  30.     function TextForLog: string; virtual;
  31.   end;

  32. type
  33.   TThreadsPool = class;

  34.   //线程状态
  35.   TThreadState = (tcsInitializing, tcsWaiting, tcsGetting, tcsProcessing,
  36.     tcsProcessed, tcsTerminating, tcsCheckingDown);
  37.   // 工作线程仅用于线程池内, 不要直接创建并调用它。
  38.   TProcessorThread = class(TThread)
  39.   *******
  40.     // 创建线程时临时的Event对象, 阻塞线程直到初始化完成
  41.     hInitFinished: THandle;
  42.     // 初始化出错信息
  43.     sInitError: string;
  44.     // 记录日志
  45.     procedure WriteLog(const Str: string; Level: Integer = 0);
  46.   protected
  47.     // 线程临界区同步对像
  48.     csProcessingDataObject: TCriticalSection;
  49.     // 平均处理时间
  50.     FAverageProcessing: Integer;
  51.     // 等待请求的平均时间
  52.     FAverageWaitingTime: Integer;
  53.     // 本线程实例的运行状态
  54.     FCurState: TThreadState;
  55.     // 本线程实例所附属的线程池
  56.     FPool: TThreadsPool;
  57.     // 当前处理的数据对像。
  58.     FProcessingDataObject: TWorkItem;
  59.     // 线程停止 Event, TProcessorThread.Terminate 中开绿灯
  60.     hThreadTerminated: THandle;
  61.     uProcessingStart: DWORD;
  62.     // 开始等待的时间, 通过 GetTickCount 取得。
  63.     uWaitingStart: DWORD;
  64.     // 计算平均工作时间
  65.     function AverageProcessingTime: DWORD;
  66.     // 计算平均等待时间
  67.     function AverageWaitingTime: DWORD;
  68.     procedure Execute; override;
  69.     function IamCurrentlyProcess(DataObj: TWorkItem): Boolean;
  70.     // 转换枚举类型的线程状态为字串类型
  71.     function InfoText: string;
  72.     // 线程是否长时间处理同一个请求?(已死掉?)
  73.     function IsDead: Boolean;
  74.     // 线程是否已完成当成任务
  75.     function isFinished: Boolean;
  76.     // 线程是否处于空闲状态
  77.     function isIdle: Boolean;
  78.     // 平均值校正计算。
  79.     function NewAverage(OldAvg, NewVal: Integer): Integer;
  80.   public
  81.     Tag: Integer;
  82.     constructor Create(APool: TThreadsPool);
  83.     destructor Destroy; override;
  84.     procedure Terminate;
  85.   end;

  86.   // 线程初始化时触发的事件
  87.   TProcessorThreadInitializing = procedure(Sender: TThreadsPool; aThread:
  88.     TProcessorThread) of object;
  89.   // 线程结束时触发的事件
  90.   TProcessorThreadFinalizing = procedure(Sender: TThreadsPool; aThread:
  91.     TProcessorThread) of object;
  92.   // 线程处理请求时触发的事件
  93.   TProcessRequest = procedure(Sender: TThreadsPool; WorkItem: TWorkItem;
  94.     aThread: TProcessorThread) of object;
  95.   TEmptyKind = (
  96.     ekQueueEmpty, //任务被取空后
  97.     ekProcessingFinished // 最后一个任务处理完毕后
  98.     );
  99.   // 任务队列空时触发的事件
  100.   TQueueEmpty = procedure(Sender: TThreadsPool; EmptyKind: TEmptyKind) of
  101.     object;

  102.   TThreadsPool = class(TComponent)
  103.   *******
  104.     csQueueManagment: TCriticalSection;
  105.     csThreadManagment: TCriticalSection;
  106.     FProcessRequest: TProcessRequest;
  107.     FQueue: TList;
  108.     FQueueEmpty: TQueueEmpty;
  109.     // 线程超时阀值
  110.     FThreadDeadTimeout: DWORD;
  111.     FThreadFinalizing: TProcessorThreadFinalizing;
  112.     FThreadInitializing: TProcessorThreadInitializing;
  113.     // 工作中的线程
  114.     FThreads: TList;
  115.     // 执行了 terminat 发送退出指令, 正在结束的线程.
  116.     FThreadsKilling: TList;
  117.     // 最少, 最大线程数
  118.     FThreadsMax: Integer;
  119.     // 最少, 最大线程数
  120.     FThreadsMin: Integer;
  121.     // 池平均等待时间
  122.     function PoolAverageWaitingTime: Integer;
  123.     procedure WriteLog(const Str: string; Level: Integer = 0);
  124.   protected
  125.     FLastGetPoint: Integer;
  126.     // Semaphore, 统计任务队列
  127.     hSemRequestCount: THandle;
  128.     // Waitable timer. 每30触发一次的时间量同步
  129.     hTimCheckPoolDown: THandle;
  130.     // 线程池停机(检查并清除空闲线程和死线程)
  131.     procedure CheckPoolDown;
  132.     // 清除死线程,并补充不足的工作线程
  133.     procedure CheckThreadsForGrow;
  134.     procedure DoProcessed;
  135.     procedure DoProcessRequest(aDataObj: TWorkItem; aThread: TProcessorThread);
  136.       virtual;
  137.     procedure DoQueueEmpty(EmptyKind: TEmptyKind); virtual;
  138.     procedure DoThreadFinalizing(aThread: TProcessorThread); virtual;
  139.     // 执行事件
  140.     procedure DoThreadInitializing(aThread: TProcessorThread); virtual;
  141.     // 释放 FThreadsKilling 列表中的线程
  142.     procedure FreeFinishedThreads;
  143.     // 申请任务
  144.     procedure GetRequest(out Request: TWorkItem);
  145.     // 清除死线程
  146.     procedure KillDeadThreads;
  147.   public
  148.     constructor Create(AOwner: TComponent); override;
  149.     destructor Destroy; override;
  150.     // 就进行任务是否重复的检查, 检查发现重复就返回 False
  151.     function AddRequest(aDataObject: TWorkItem; CheckForDoubles: Boolean =
  152.       False): Boolean; overload;
  153.     // 转换枚举类型的线程状态为字串类型
  154.     function InfoText: string;
  155.   published
  156.     // 线程处理任务时触发的事件
  157.     property OnProcessRequest: TProcessRequest read FProcessRequest write
  158.       FProcessRequest;
  159.     // 任务列表为空时解发的事件
  160.     property OnQueueEmpty: TQueueEmpty read FQueueEmpty write FQueueEmpty;
  161.     // 线程结束时触发的事件
  162.     property OnThreadFinalizing: TProcessorThreadFinalizing read
  163.       FThreadFinalizing write FThreadFinalizing;
  164.     // 线程初始化时触发的事件
  165.     property OnThreadInitializing: TProcessorThreadInitializing read
  166.       FThreadInitializing write FThreadInitializing;
  167.     // 线程超时值(毫秒), 如果处理超时,将视为死线程
  168.     property ThreadDeadTimeout: DWORD read FThreadDeadTimeout write
  169.       FThreadDeadTimeout default 0;
  170.     // 最大线程数
  171.     property ThreadsMax: Integer read FThreadsMax write FThreadsMax default 1;
  172.     // 最小线程数
  173.     property ThreadsMin: Integer read FThreadsMin write FThreadsMin default 0;
  174.   end;

  175. type
  176.   //日志记志函数
  177.   TLogWriteProc = procedure(
  178.     const Str: string; //日志
  179.     LogID: Integer = 0;
  180.     Level: Integer = 0 //Level = 0 - 跟踪信息, 10 - 致命错误
  181.     );

  182. var
  183.   WriteLog: TLogWriteProc; // 如果存在实例就写日志

  184. implementation
  185. uses
  186.   SysUtils;

  187. // 储存请求数据的基本类
  188. {
  189. ********************************** TWorkItem ***********************************
  190. }

  191. function TWorkItem.IsTheSame(DataObj: TWorkItem): Boolean;
  192. begin
  193.   Result := False;
  194. end; { TWorkItem.IsTheSame }

  195. function TWorkItem.TextForLog: string;
  196. begin
  197.   Result := 'Request';
  198. end; { TWorkItem.TextForLog }

  199. {
  200. ********************************* TThreadsPool *********************************
  201. }

  202. constructor TThreadsPool.Create(AOwner: TComponent);
  203. var
  204.   DueTo: Int64;
  205. begin
  206. {$IFNDEF NOLOGS}
  207.   WriteLog('创建线程池', 5);
  208. {$ENDIF}
  209.   inherited;
  210.   csQueueManagment := TCriticalSection.Create;
  211.   FQueue := TList.Create;
  212.   csThreadManagment := TCriticalSection.Create;
  213.   FThreads := TList.Create;
  214.   FThreadsKilling := TList.Create;
  215.   FThreadsMin := 0;
  216.   FThreadsMax := 1;
  217.   FThreadDeadTimeout := 0;
  218.   FLastGetPoint := 0;
  219.   //
  220.   hSemRequestCount := CreateSemaphore(nil, 0, $7FFFFFFF, nil);

  221.   DueTo := -1;
  222.   //可等待的定时器(只用于Window NT4或更高)
  223.   hTimCheckPoolDown := CreateWaitableTimer(nil, False, nil);

  224.   if hTimCheckPoolDown = 0 then // Win9x不支持
  225.     // In Win9x number of thread will be never decrised
  226.     hTimCheckPoolDown := CreateEvent(nil, False, False, nil)
  227.   else
  228.     SetWaitableTimer(hTimCheckPoolDown, DueTo, 30000, nil, nil, False);
  229. end; { TThreadsPool.Create }

  230. destructor TThreadsPool.Destroy;
  231. var
  232.   n, i: Integer;
  233.   Handles: array of THandle;
  234. begin
  235. {$IFNDEF NOLOGS}
  236.   WriteLog('线程池销毁', 5);
  237. {$ENDIF}
  238.   csThreadManagment.Enter;

  239.   SetLength(Handles, FThreads.Count);
  240.   n := 0;
  241.   for i := 0 to FThreads.Count - 1 do
  242.     if FThreads[i] <> nil then
  243.     begin
  244.       Handles[n] := TProcessorThread(FThreads[i]).Handle;
  245.       TProcessorThread(FThreads[i]).Terminate;
  246.       Inc(n);
  247.     end;

  248.   csThreadManagment.Leave;  // lixiaoyu 添加于 2009.1.6,如没有此行代码无法成功释放正在执行中的工作者线程,死锁。

  249.   WaitForMultipleObjects(n, @Handles[0], True, 30000);  // 等待工作者线程执行终止  lixiaoyu 注释于 2009.1.6

  250.   csThreadManagment.Enter;  // lixiaoyu 添加于 2009.1.6 再次进入锁定,并释放资源
  251.   for i := 0 to FThreads.Count - 1 do
  252.     TProcessorThread(FThreads[i]).Free;
  253.   FThreads.Free;
  254.   FThreadsKilling.Free;
  255.   csThreadManagment.Free;

  256.   csQueueManagment.Enter;
  257.   for i := FQueue.Count - 1 downto 0 do
  258.     TObject(FQueue[i]).Free;
  259.   FQueue.Free;
  260.   csQueueManagment.Free;

  261.   CloseHandle(hSemRequestCount);
  262.   CloseHandle(hTimCheckPoolDown);
  263.   inherited;
  264. end; { TThreadsPool.Destroy }

  265. function TThreadsPool.AddRequest(aDataObject: TWorkItem; CheckForDoubles:
  266.   Boolean = False): Boolean;
  267. var
  268.   i: Integer;
  269. begin
  270. {$IFNDEF NOLOGS}
  271.   WriteLog('AddRequest(' + aDataObject.TextForLog + ')', 2);
  272. {$ENDIF}
  273.   Result := False;
  274.   csQueueManagment.Enter;
  275.   try
  276.     // 如果 CheckForDoubles = TRUE
  277.     // 则进行任务是否重复的检查
  278.     if CheckForDoubles then
  279.       for i := 0 to FQueue.Count - 1 do
  280.         if (FQueue[i] <> nil)
  281.           and aDataObject.IsTheSame(TWorkItem(FQueue[i])) then
  282.           Exit; // 发现有相同的任务

  283.     csThreadManagment.Enter;
  284.     try
  285.       // 清除死线程,并补充不足的工作线程
  286.       CheckThreadsForGrow;

  287.       // 如果 CheckForDoubles = TRUE
  288.       // 则检查是否有相同的任务正在处理中
  289.       if CheckForDoubles then
  290.         for i := 0 to FThreads.Count - 1 do
  291.           if TProcessorThread(FThreads[i]).IamCurrentlyProcess(aDataObject) then
  292.             Exit; // 发现有相同的任务

  293.     finally
  294.       csThreadManagment.Leave;
  295.     end;

  296.     //将任务加入队列
  297.     FQueue.Add(aDataObject);

  298.     //释放一个同步信号量
  299.     ReleaseSemaphore(hSemRequestCount, 1, nil);
  300. {$IFNDEF NOLOGS}
  301.     WriteLog('释放一个同步信号量)', 1);
  302. {$ENDIF}
  303.     Result := True;
  304.   finally
  305.     csQueueManagment.Leave;
  306.   end;
  307. {$IFNDEF NOLOGS}
  308.   //调试信息
  309.   WriteLog('增加一个任务(' + aDataObject.TextForLog + ')', 1);
  310. {$ENDIF}
  311. end; { TThreadsPool.AddRequest }

  312. {
  313. 函 数 名:TThreadsPool.CheckPoolDown
  314. 功能描述:线程池停机(检查并清除空闲线程和死线程)
  315. 输入参数:无
  316. 返 回 值: 无
  317. 创建日期:2006.10.22 11:31
  318. 修改日期:2006.
  319. 作    者:Kook
  320. 附加说明:
  321. }

  322. procedure TThreadsPool.CheckPoolDown;
  323. var
  324.   i: Integer;
  325. begin
  326. {$IFNDEF NOLOGS}
  327.   WriteLog('TThreadsPool.CheckPoolDown', 1);
  328. {$ENDIF}
  329.   csThreadManagment.Enter;
  330.   try
  331. {$IFNDEF NOLOGS}
  332.     WriteLog(InfoText, 2);
  333. {$ENDIF}
  334.     // 清除死线程
  335.     KillDeadThreads;
  336.     // 释放 FThreadsKilling 列表中的线程
  337.     FreeFinishedThreads;

  338.     // 如果线程空闲,就终止它
  339.     for i := FThreads.Count - 1 downto FThreadsMin do
  340.       if TProcessorThread(FThreads[i]).isIdle then
  341.       begin
  342.         //发出终止命令
  343.         TProcessorThread(FThreads[i]).Terminate;
  344.         //加入待清除队列
  345.         FThreadsKilling.Add(FThreads[i]);
  346.         //从工作队列中除名
  347.         FThreads.Delete(i);
  348.         //todo: ??
  349.         Break;
  350.       end;
  351.   finally
  352.     csThreadManagment.Leave;
  353.   end;
  354. end; { TThreadsPool.CheckPoolDown }

  355. {
  356. 函 数 名:TThreadsPool.CheckThreadsForGrow
  357. 功能描述:清除死线程,并补充不足的工作线程
  358. 输入参数:无
  359. 返 回 值: 无
  360. 创建日期:2006.10.22 11:31
  361. 修改日期:2006.
  362. 作    者:Kook
  363. 附加说明:
  364. }

  365. procedure TThreadsPool.CheckThreadsForGrow;
  366. var
  367.   AvgWait: Integer;
  368.   i: Integer;
  369. begin
  370.   {
  371.     New thread created if:
  372.     新建线程的条件:
  373.       1. 工作线程数小于最小线程数
  374.       2. 工作线程数小于最大线程数 and 线程池平均等待时间 < 100ms(系统忙)
  375.       3. 任务大于工作线程数的4倍
  376.   }

  377.   csThreadManagment.Enter;
  378.   try
  379.     KillDeadThreads;
  380.     if FThreads.Count < FThreadsMin then
  381.     begin
  382. {$IFNDEF NOLOGS}
  383.       WriteLog('工作线程数小于最小线程数', 4);
  384. {$ENDIF}
  385.       for i := FThreads.Count to FThreadsMin - 1 do
  386.       try
  387.         FThreads.Add(TProcessorThread.Create(Self));
  388.       except
  389.         on e: Exception do

  390.           WriteLog(
  391.             'TProcessorThread.Create raise: ' + e.ClassName + #13#10#9'Message: '
  392.             + e.Message,
  393.             9
  394.             );
  395.       end
  396.     end
  397.     else if FThreads.Count < FThreadsMax then
  398.     begin
  399. {$IFNDEF NOLOGS}
  400.       WriteLog('工作线程数小于最大线程数 and 线程池平均等待时间 < 100ms', 3);
  401. {$ENDIF}
  402.       AvgWait := PoolAverageWaitingTime;
  403. {$IFNDEF NOLOGS}
  404.       WriteLog(Format(
  405.         'FThreads.Count (%d)<FThreadsMax(%d), AvgWait=%d',
  406.         [FThreads.Count, FThreadsMax, AvgWait]),
  407.         4
  408.         );
  409. {$ENDIF}

  410.       if AvgWait < 100 then
  411.       try
  412.         FThreads.Add(TProcessorThread.Create(Self));
  413.       except
  414.         on e: Exception do
  415.           WriteLog(
  416.             'TProcessorThread.Create raise: ' + e.ClassName +
  417.             #13#10#9'Message: ' + e.Message,
  418.             9
  419.             );
  420.       end;
  421.     end;
  422.   finally
  423.     csThreadManagment.Leave;
  424.   end;
  425. end; { TThreadsPool.CheckThreadsForGrow }

  426. procedure TThreadsPool.DoProcessed;
  427. var
  428.   i: Integer;
  429. begin
  430.   if (FLastGetPoint < FQueue.Count) then
  431.     Exit;
  432.   csThreadManagment.Enter;
  433.   try
  434.     for i := 0 to FThreads.Count - 1 do
  435.       if TProcessorThread(FThreads[i]).FCurState in [tcsProcessing] then
  436.         Exit;
  437.   finally
  438.     csThreadManagment.Leave;
  439.   end;
  440.   DoQueueEmpty(ekProcessingFinished);
  441. end; { TThreadsPool.DoProcessed }

  442. procedure TThreadsPool.DoProcessRequest(aDataObj: TWorkItem; aThread:
  443.   TProcessorThread);
  444. begin
  445.   if Assigned(FProcessRequest) then
  446.     FProcessRequest(Self, aDataObj, aThread);
  447. end; { TThreadsPool.DoProcessRequest }

  448. procedure TThreadsPool.DoQueueEmpty(EmptyKind: TEmptyKind);
  449. begin
  450.   if Assigned(FQueueEmpty) then
  451.     FQueueEmpty(Self, EmptyKind);
  452. end; { TThreadsPool.DoQueueEmpty }

  453. procedure TThreadsPool.DoThreadFinalizing(aThread: TProcessorThread);
  454. begin
  455.   if Assigned(FThreadFinalizing) then
  456.     FThreadFinalizing(Self, aThread);
  457. end; { TThreadsPool.DoThreadFinalizing }

  458. procedure TThreadsPool.DoThreadInitializing(aThread: TProcessorThread);
  459. begin
  460.   if Assigned(FThreadInitializing) then
  461.     FThreadInitializing(Self, aThread);
  462. end; { TThreadsPool.DoThreadInitializing }

  463. {
  464. 函 数 名:TThreadsPool.FreeFinishedThreads
  465. 功能描述:释放 FThreadsKilling 列表中的线程
  466. 输入参数:无
  467. 返 回 值: 无
  468. 创建日期:2006.10.22 11:34
  469. 修改日期:2006.
  470. 作    者:Kook
  471. 附加说明:
  472. }

  473. procedure TThreadsPool.FreeFinishedThreads;
  474. var
  475.   i: Integer;
  476. begin
  477.   if csThreadManagment.TryEnter then
  478.   try
  479.     for i := FThreadsKilling.Count - 1 downto 0 do
  480.       if TProcessorThread(FThreadsKilling[i]).isFinished then
  481.       begin
  482.         TProcessorThread(FThreadsKilling[i]).Free;
  483.         FThreadsKilling.Delete(i);
  484.       end;
  485.   finally
  486.     csThreadManagment.Leave
  487.   end;
  488. end; { TThreadsPool.FreeFinishedThreads }

  489. {
  490. 函 数 名:TThreadsPool.GetRequest
  491. 功能描述:申请任务
  492. 输入参数:out Request: TRequestDataObject
  493. 返 回 值: 无
  494. 创建日期:2006.10.22 11:34
  495. 修改日期:2006.
  496. 作    者:Kook
  497. 附加说明:
  498. }

  499. procedure TThreadsPool.GetRequest(out Request: TWorkItem);
  500. begin
  501. {$IFNDEF NOLOGS}
  502.   WriteLog('申请任务', 2);
  503. {$ENDIF}
  504.   csQueueManagment.Enter;
  505.   try
  506.     //跳过空的队列元素
  507.     while (FLastGetPoint < FQueue.Count) and (FQueue[FLastGetPoint] = nil) do
  508.       Inc(FLastGetPoint);

  509.     Assert(FLastGetPoint < FQueue.Count);
  510.     //压缩队列,清除空元素
  511.     if (FQueue.Count > 127) and (FLastGetPoint >= (3 * FQueue.Count) div 4) then
  512.     begin
  513. {$IFNDEF NOLOGS}
  514.       WriteLog('FQueue.Pack', 1);
  515. {$ENDIF}
  516.       FQueue.Pack;
  517.       FLastGetPoint := 0;
  518.     end;

  519.     Request := TWorkItem(FQueue[FLastGetPoint]);
  520.     FQueue[FLastGetPoint] := nil;
  521.     inc(FLastGetPoint);
  522.     if (FLastGetPoint = FQueue.Count) then //如果队列中无任务
  523.     begin

  524.       DoQueueEmpty(ekQueueEmpty);
  525.       FQueue.Clear;
  526.       FLastGetPoint := 0;
  527.     end;
  528.   finally
  529.     csQueueManagment.Leave;
  530.   end;
  531. end; { TThreadsPool.GetRequest }

  532. function TThreadsPool.InfoText: string;
  533. begin
  534.   Result := '';
  535.   //end;
  536.   //{$ELSE}
  537.   //var
  538.   //  i: Integer;
  539.   //begin
  540.   //  csQueueManagment.Enter;
  541.   //  csThreadManagment.Enter;
  542.   //  try
  543.   //    if (FThreads.Count = 0) and (FThreadsKilling.Count = 1) and
  544.   //      TProcessorThread(FThreadsKilling[0]).isFinished then
  545.   //      FreeFinishedThreads;
  546.   //
  547.   //    Result := Format(
  548.   //      'Pool thread: Min=%d, Max=%d, WorkingThreadsCount=%d, TerminatedThreadCount=%d, QueueLength=%d'#13#10,
  549.   //      [ThreadsMin, ThreadsMax, FThreads.Count, FThreadsKilling.Count,
  550.   //      FQueue.Count]
  551.   //        );
  552.   //    if FThreads.Count > 0 then
  553.   //      Result := Result + 'Working threads:'#13#10;
  554.   //    for i := 0 to FThreads.Count - 1 do
  555.   //      Result := Result + TProcessorThread(FThreads[i]).InfoText + #13#10;
  556.   //    if FThreadsKilling.Count > 0 then
  557.   //      Result := Result + 'Terminated threads:'#13#10;
  558.   //    for i := 0 to FThreadsKilling.Count - 1 do
  559.   //      Result := Result + TProcessorThread(FThreadsKilling[i]).InfoText + #13#10;
  560.   //  finally
  561.   //    csThreadManagment.Leave;
  562.   //    csQueueManagment.Leave;
  563.   //  end;
  564.   //end;
  565.   //{$ENDIF}
  566. end; { TThreadsPool.InfoText }

  567. {
  568. 函 数 名:TThreadsPool.KillDeadThreads
  569. 功能描述:清除死线程
  570. 输入参数:无
  571. 返 回 值: 无
  572. 创建日期:2006.10.22 11:32
  573. 修改日期:2006.
  574. 作    者:Kook
  575. 附加说明:
  576. }

  577. procedure TThreadsPool.KillDeadThreads;
  578. var
  579.   i: Integer;
  580. begin
  581.   // Check for dead threads
  582.   if csThreadManagment.TryEnter then
  583.   try
  584.     for i := 0 to FThreads.Count - 1 do
  585.       if TProcessorThread(FThreads[i]).IsDead then
  586.       begin
  587.         // Dead thread moverd to other list.
  588.         // New thread created to replace dead one
  589.         TProcessorThread(FThreads[i]).Terminate;
  590.         FThreadsKilling.Add(FThreads[i]);
  591.         try
  592.           FThreads[i] := TProcessorThread.Create(Self);
  593.         except
  594.           on e: Exception do
  595.           begin
  596.             FThreads[i] := nil;
  597. {$IFNDEF NOLOGS}
  598.             WriteLog(
  599.               'TProcessorThread.Create raise: ' + e.ClassName +
  600.               #13#10#9'Message: ' + e.Message,
  601.               9
  602.               );
  603. {$ENDIF}
  604.           end;
  605.         end;
  606.       end;
  607.   finally
  608.     csThreadManagment.Leave
  609.   end;
  610. end; { TThreadsPool.KillDeadThreads }

  611. function TThreadsPool.PoolAverageWaitingTime: Integer;
  612. var
  613.   i: Integer;
  614. begin
  615.   Result := 0;
  616.   if FThreads.Count > 0 then
  617.   begin
  618.     for i := 0 to FThreads.Count - 1 do
  619.       Inc(result, TProcessorThread(FThreads[i]).AverageWaitingTime);
  620.     Result := Result div FThreads.Count
  621.   end
  622.   else
  623.     Result := 1;
  624. end; { TThreadsPool.PoolAverageWaitingTime }

  625. procedure TThreadsPool.WriteLog(const Str: string; Level: Integer = 0);
  626. begin
  627. {$IFNDEF NOLOGS}
  628.   uThreadPool.WriteLog(Str, 0, Level);
  629. {$ENDIF}
  630. end; { TThreadsPool.WriteLog }

  631. // 工作线程仅用于线程池内, 不要直接创建并调用它。
  632. {
  633. ******************************* TProcessorThread *******************************
  634. }

  635. constructor TProcessorThread.Create(APool: TThreadsPool);
  636. begin
  637.   WriteLog('创建工作线程', 5);
  638.   inherited Create(True);
  639.   FPool := aPool;

  640.   FAverageWaitingTime := 1000;
  641.   FAverageProcessing := 3000;

  642.   sInitError := '';
  643.   {
  644.   各参数的意义如下:
  645.    
  646.   参数一:填上 nil 即可。
  647.   参数二:是否采用手动调整灯号。
  648.   参数三:灯号的起始状态,False 表示红灯。
  649.   参数四:Event 名称, 对象名称相同的话,会指向同一个对象,所以想要有两个Event对象,便要有两个不同的名称(这名称以字符串来存.为NIL的话系统每次会自己创建一个不同的名字,就是被次创建的都是新的EVENT)。
  650.   传回值:Event handle。
  651.   }
  652.   hInitFinished := CreateEvent(nil, True, False, nil);
  653.   hThreadTerminated := CreateEvent(nil, True, False, nil);
  654.   csProcessingDataObject := TCriticalSection.Create;
  655.   try
  656.     WriteLog('TProcessorThread.Create::Resume', 3);
  657.     Resume;
  658.     //阻塞, 等待初始化完成
  659.     WaitForSingleObject(hInitFinished, INFINITE);
  660.     if sInitError <> '' then
  661.       raise Exception.Create(sInitError);
  662.   finally
  663.     CloseHandle(hInitFinished);
  664.   end;
  665.   WriteLog('TProcessorThread.Create::Finished', 3);
  666. end; { TProcessorThread.Create }

  667. destructor TProcessorThread.Destroy;
  668. begin
  669.   WriteLog('工作线程销毁', 5);
  670.   CloseHandle(hThreadTerminated);
  671.   csProcessingDataObject.Free;
  672.   inherited;
  673. end; { TProcessorThread.Destroy }

  674. function TProcessorThread.AverageProcessingTime: DWORD;
  675. begin
  676.   if (FCurState in [tcsProcessing]) then
  677.     Result := NewAverage(FAverageProcessing, GetTickCount - uProcessingStart)
  678.   else
  679.     Result := FAverageProcessing
  680. end; { TProcessorThread.AverageProcessingTime }

  681. function TProcessorThread.AverageWaitingTime: DWORD;
  682. begin
  683.   if (FCurState in [tcsWaiting, tcsCheckingDown]) then
  684.     Result := NewAverage(FAverageWaitingTime, GetTickCount - uWaitingStart)
  685.   else
  686.     Result := FAverageWaitingTime
  687. end; { TProcessorThread.AverageWaitingTime }

  688. procedure TProcessorThread.Execute;

  689. type
  690.   THandleID = (hidTerminateThread, hidRequest, hidCheckPoolDown);
  691. var
  692.   WaitedTime: Integer;
  693.   Handles: array[THandleID] of THandle;

  694. begin
  695.   WriteLog('工作线程进常运行', 3);
  696.   //当前状态:初始化
  697.   FCurState := tcsInitializing;
  698.   try
  699.     //执行外部事件
  700.     FPool.DoThreadInitializing(Self);
  701.   except
  702.     on e: Exception do
  703.       sInitError := e.Message;
  704.   end;

  705.   //初始化完成,初始化Event绿灯
  706.   SetEvent(hInitFinished);

  707.   WriteLog('TProcessorThread.Execute::Initialized', 3);

  708.   //引用线程池的同步 Event
  709.   Handles[hidTerminateThread] := hThreadTerminated;
  710.   Handles[hidRequest] := FPool.hSemRequestCount;
  711.   Handles[hidCheckPoolDown] := FPool.hTimCheckPoolDown;

  712.   //时间戳,
  713.   //todo: 好像在线程中用 GetTickCount; 会不正常
  714.   uWaitingStart := GetTickCount;
  715.   //任务置空
  716.   FProcessingDataObject := nil;

  717.   //大巡环
  718.   while not terminated do
  719.   begin
  720.     //当前状态:等待
  721.     FCurState := tcsWaiting;
  722.     //阻塞线程,使线程休眠
  723.     case WaitForMultipleObjects(Length(Handles), @Handles, False, INFINITE) -
  724.       WAIT_OBJECT_0 of

  725.       WAIT_OBJECT_0 + ord(hidTerminateThread):
  726.         begin
  727.           WriteLog('TProcessorThread.Execute:: Terminate event signaled ', 5);
  728.           //当前状态:正在终止线程
  729.           FCurState := tcsTerminating;
  730.           //退出大巡环(结束线程)
  731.           Break;
  732.         end;

  733.       WAIT_OBJECT_0 + ord(hidRequest):
  734.         begin
  735.           WriteLog('TProcessorThread.Execute:: Request semaphore signaled ', 3);
  736.           //等待的时间
  737.           WaitedTime := GetTickCount - uWaitingStart;
  738.           //重新计算平均等待时间
  739.           FAverageWaitingTime := NewAverage(FAverageWaitingTime, WaitedTime);
  740.           //当前状态:申请任务
  741.           FCurState := tcsGetting;
  742.           //如果等待时间过短,则检查工作线程是否足够
  743.           if WaitedTime < 5 then
  744.             FPool.CheckThreadsForGrow;
  745.           //从线程池的任务队列中得到任务
  746.           FPool.GetRequest(FProcessingDataObject);
  747.           //开始处理的时间戳
  748.           uProcessingStart := GetTickCount;
  749.           //当前状态:执行任务
  750.           FCurState := tcsProcessing;
  751.           try
  752. {$IFNDEF NOLOGS}
  753.             WriteLog('Processing: ' + FProcessingDataObject.TextForLog, 2);
  754. {$ENDIF}
  755.             //执行任务
  756.             FPool.DoProcessRequest(FProcessingDataObject, Self);
  757.           except
  758.             on e: Exception do
  759.               WriteLog(
  760.                 'OnProcessRequest for ' + FProcessingDataObject.TextForLog +
  761.                 #13#10'raise Exception: ' + e.Message,
  762.                 8
  763.                 );
  764.           end;

  765.           //释放任务对象
  766.           csProcessingDataObject.Enter;
  767.           try
  768.             FProcessingDataObject.Free;
  769.             FProcessingDataObject := nil;
  770.           finally
  771.             csProcessingDataObject.Leave;
  772.           end;
  773.           //重新计算
  774.           FAverageProcessing := NewAverage(FAverageProcessing, GetTickCount -
  775.             uProcessingStart);
  776.           //当前状态:执行任务完毕
  777.           FCurState := tcsProcessed;
  778.           //执行线程外事件
  779.           FPool.DoProcessed;

  780.           uWaitingStart := GetTickCount;
  781.         end;
  782.       WAIT_OBJECT_0 + ord(hidCheckPoolDown):
  783.         begin
  784.           // !!! Never called under Win9x
  785.           WriteLog('TProcessorThread.Execute:: CheckPoolDown timer signaled ',
  786.             4);
  787.           //当前状态:线程池停机(检查并清除空闲线程和死线程)
  788.           FCurState := tcsCheckingDown;
  789.           FPool.CheckPoolDown;
  790.         end;
  791.     end;
  792.   end;
  793.   FCurState := tcsTerminating;

  794.   FPool.DoThreadFinalizing(Self);
  795. end; { TProcessorThread.Execute }

  796. function TProcessorThread.IamCurrentlyProcess(DataObj: TWorkItem): Boolean;
  797. begin
  798.   csProcessingDataObject.Enter;
  799.   try
  800.     Result := (FProcessingDataObject <> nil) and
  801.       DataObj.IsTheSame(FProcessingDataObject);
  802.   finally
  803.     csProcessingDataObject.Leave;
  804.   end;
  805. end; { TProcessorThread.IamCurrentlyProcess }

  806. function TProcessorThread.InfoText: string;

  807. const
  808.   ThreadStateNames: array[TThreadState] of string =
  809.   (
  810.     'tcsInitializing',
  811.     'tcsWaiting',
  812.     'tcsGetting',
  813.     'tcsProcessing',
  814.     'tcsProcessed',
  815.     'tcsTerminating',
  816.     'tcsCheckingDown'
  817.     );

  818. begin
  819. {$IFNDEF NOLOGS}
  820.   Result := Format(
  821.     '%5d: %15s, AverageWaitingTime=%6d, AverageProcessingTime=%6d',
  822.     [ThreadID, ThreadStateNames[FCurState], AverageWaitingTime,
  823.     AverageProcessingTime]
  824.       );
  825.   case FCurState of
  826.     tcsWaiting:
  827.       Result := Result + ', WaitingTime=' + IntToStr(GetTickCount -
  828.         uWaitingStart);
  829.     tcsProcessing:
  830.       Result := Result + ', ProcessingTime=' + IntToStr(GetTickCount -
  831.         uProcessingStart);
  832.   end;

  833.   csProcessingDataObject.Enter;
  834.   try
  835.     if FProcessingDataObject <> nil then
  836.       Result := Result + ' ' + FProcessingDataObject.TextForLog;
  837.   finally
  838.     csProcessingDataObject.Leave;
  839.   end;
  840. {$ENDIF}
  841. end; { TProcessorThread.InfoText }

  842. function TProcessorThread.IsDead: Boolean;
  843. begin
  844.   Result :=
  845.     Terminated or
  846.     (FPool.ThreadDeadTimeout > 0) and (FCurState = tcsProcessing) and
  847.     (GetTickCount - uProcessingStart > FPool.ThreadDeadTimeout);
  848.   if Result then
  849.     WriteLog('Thread dead', 5);
  850. end; { TProcessorThread.IsDead }

  851. function TProcessorThread.isFinished: Boolean;
  852. begin
  853.   Result := WaitForSingleObject(Handle, 0) = WAIT_OBJECT_0;
  854. end; { TProcessorThread.isFinished }

  855. function TProcessorThread.isIdle: Boolean;
  856. begin
  857.   // 如果线程状态是 tcsWaiting, tcsCheckingDown
  858.   // 并且 空间时间 > 100ms,
  859.   // 并且 平均等候任务时间大于平均工作时间的 50%
  860.   // 则视为空闲。
  861.   Result :=
  862.     (FCurState in [tcsWaiting, tcsCheckingDown]) and
  863.     (AverageWaitingTime > 100) and
  864.     (AverageWaitingTime * 2 > AverageProcessingTime);
  865. end; { TProcessorThread.isIdle }

  866. function TProcessorThread.NewAverage(OldAvg, NewVal: Integer): Integer;
  867. begin
  868.   Result := (OldAvg * 2 + NewVal) div 3;
  869. end; { TProcessorThread.NewAverage }

  870. procedure TProcessorThread.Terminate;
  871. begin
  872.   WriteLog('TProcessorThread.Terminate', 5);
  873.   inherited Terminate;
  874.   SetEvent(hThreadTerminated);
  875. end; { TProcessorThread.Terminate }

  876. procedure TProcessorThread.WriteLog(const Str: string; Level: Integer = 0);
  877. begin
  878. {$IFNDEF NOLOGS}
  879.   uThreadPool.WriteLog(Str, ThreadID, Level);
  880. {$ENDIF}
  881. end; { TProcessorThread.WriteLog }

  882. {
  883. ******************************* TCriticalSection *******************************
  884. }

  885. constructor TCriticalSection.Create;
  886. begin
  887.   InitializeCriticalSection(FSection);
  888. end; { TCriticalSection.Create }

  889. destructor TCriticalSection.Destroy;
  890. begin
  891.   DeleteCriticalSection(FSection);
  892. end; { TCriticalSection.Destroy }

  893. procedure TCriticalSection.Enter;
  894. begin
  895.   EnterCriticalSection(FSection);
  896. end; { TCriticalSection.Enter }

  897. procedure TCriticalSection.Leave;
  898. begin
  899.   LeaveCriticalSection(FSection);
  900. end; { TCriticalSection.Leave }

  901. function TCriticalSection.TryEnter: Boolean;
  902. begin
  903.   Result := TryEnterCriticalSection(FSection);
  904. end; { TCriticalSection.TryEnter }

  905. procedure NoLogs(const Str: string; LogID: Integer = 0; Level: Integer = 0);
  906. begin
  907. end;

  908. initialization
  909.   WriteLog := NoLogs;
  910. end.
复制代码
使用方法
  1. // 创建线程池
  2. FThreadPool := TThreadsPool.Create(Self); // 创建线程池
  3. FThreadPool.ThreadsMin := 5; // 初始工作线程数
  4. FThreadPool.ThreadsMax := 50; // 最大允许工作线程数
  5. FThreadPool.OnProcessRequest := DealwithCommRecvData; // 线程工作函数(DealwithCommRecvData在工作者线程的Execute方法中被调用)


  6. // 使用线程池
  7. var
  8.  AWorkItem: TRecvCommDataWorkItem; // 继承自TWorkItem
  9. begin
  10.  AWorkItem := TRecvCommDataWorkItem.Create;
  11.  Move(PData[0], AWorkItem.FRecvData[0], PDataLen);
  12.  AWorkItem.FRecvDataLen := PDataLen;
  13.  FThreadPool.AddRequest(AWorkItem); // 向线程池分配一个任务
  14. end;
复制代码
source:http://blog.csdn.net/babyvspp/archive/2008/01/01/2008234.aspx
source:http://www.delphifans.com/SoftView/SoftView_2722.html
本主题由 皇帝 5207 于 2009-7-3 22:59:21 执行 主题分类 操作
分享 转发
相信与不相信都是矛盾的.  5207宣!
欢迎您来到点滴论坛
TOP