mormot.core.threads--TSynThreadPool

海利鸟 2024-07-10 11:39:00 阅读 93

mormot.core.threads--TSynThreadPool

<code>{ ************ 面向服务器进程的线程池 }

TSynThreadPool = class; // 前向声明TSynThreadPool类

/// 定义了TSynThreadPool所使用的工作线程

TSynThreadPoolWorkThread = class(TSynThread)

protected

fOwner: TSynThreadPool; // 线程池所有者

fThreadNumber: integer; // 线程编号

{$ifndef USE_WINIOCP} // 如果不使用Windows I/O完成端口

fProcessingContext: pointer; // 正在处理的上下文,受fOwner.fSafe.Lock保护

fEvent: TSynEvent; // 同步事件

{$endif USE_WINIOCP}

procedure NotifyThreadStart(Sender: TSynThread); // 通知线程开始

procedure DoTask(Context: pointer); // 异常安全地调用fOwner.Task()

public

/// 初始化线程

constructor Create(Owner: TSynThreadPool); reintroduce;

/// 终结线程

destructor Destroy; override;

/// 循环等待并执行挂起的任务,通过调用fOwner.Task()

procedure Execute; override;

/// 关联的线程池

property Owner: TSynThreadPool

read fOwner;

end;

TSynThreadPoolWorkThreads = array of TSynThreadPoolWorkThread; // 线程池工作线程数组类型

/// 一个简单的线程池,用于例如快速处理HTTP/1.0请求

// - 在Windows上通过I/O完成端口实现,或在Linux/POSIX上使用经典的事件驱动方法

TSynThreadPool = class

protected

{$ifndef USE_WINIOCP} // 如果不使用Windows I/O完成端口

fSafe: TOSLightLock; // 使用更稳定的锁

{$endif USE_WINIOCP}

fWorkThread: TSynThreadPoolWorkThreads; // 工作线程数组

fWorkThreadCount: integer; // 工作线程数量

fRunningThreads: integer; // 正在运行的线程数量

fExceptionsCount: integer; // 异常计数(未在代码中明确使用,但可能用于调试或监控)

fContentionAbortDelay: integer; // 由于争用而拒绝连接的延迟时间(毫秒)

fOnThreadTerminate: TOnNotifyThread; // 线程终止通知事件

fOnThreadStart: TOnNotifyThread; // 线程开始通知事件

fContentionTime: Int64; // 等待队列可用槽位的总时间(毫秒)

fContentionAbortCount: cardinal; // 由于争用而中止的任务数

fContentionCount: cardinal; // 等待队列可用槽位的次数

fName: RawUtf8; // 线程池名称

fTerminated: boolean; // 线程池是否已终止

{$ifdef USE_WINIOCP} // 如果使用Windows I/O完成端口

fRequestQueue: THandle; // IOCP有其自己的内部队列

{$else}

fQueuePendingContext: boolean; // 当所有线程都忙时,是否应维护一个内部队列

fPendingContext: array of pointer; // 挂起的上下文数组

fPendingContextCount: integer; // 挂起的上下文数量

function GetPendingContextCount: integer; // 获取挂起上下文数量的函数

function PopPendingContext: pointer; // 从挂起上下文数组中弹出一个元素的函数

function QueueLength: integer; virtual; // 获取队列长度的虚拟函数(可能用于调试)

{$endif USE_WINIOCP}

/// 在I/O错误时结束线程

function NeedStopOnIOError: boolean; virtual;

/// 在通知后要执行的进程,这是一个抽象方法,需要被子类实现

procedure Task(aCaller: TSynThreadPoolWorkThread; aContext: pointer); virtual; abstract;

/// 中止任务的进程

procedure TaskAbort(aContext: Pointer); virtual;

public

/// 使用指定的线程数初始化线程池

// - 抽象的Task()方法将由其中一个线程调用

// - 一个线程池最多可以关联256个线程

// - 在Windows上,可以可选地接受一个之前使用Windows重叠I/O(IOCP)打开的aOverlapHandle

// - 在POSIX上,如果aQueuePendingContext=true,则将挂起的上下文存储到内部队列中,

// 以便在队列未满时Push()返回true

{$ifdef USE_WINIOCP}

constructor Create(NumberOfThreads: integer = 32; aOverlapHandle: THandle = INVALID_HANDLE_VALUE; const aName: RawUtf8 = '');

{$else}

constructor Create(NumberOfThreads: integer = 32; aQueuePendingContext: boolean = false; const aName: RawUtf8 = '');

{$endif USE_WINIOCP}

/// 关闭线程池,释放所有关联的线程

destructor Destroy; override;

/// 让线程池处理一个指定的任务(作为指针)

// - 如果没有空闲线程可用,并且Create(aQueuePendingContext=false)被使用,则返回false(调用者稍后应重试)

// - 如果在Create中aQueuePendingContext为true,或使用了IOCP,则提供的上下文将被添加到内部列表,并在可能时处理

// - 如果aWaitOnContention默认为false,则在队列满时立即返回

// - 设置aWaitOnContention=true以等待最多ContentionAbortDelay毫秒并重试将任务排队

function Push(aContext: pointer; aWaitOnContention: boolean = false): boolean;

{$ifndef USE_WINIOCP}

/// 在Push()返回false后调用,以查看队列是否确实已满

// - 如果QueuePendingContext为false,则返回false

function QueueIsFull: boolean;

/// 如果所有线程都忙时,线程池是否应维护一个内部队列

// - 作为Create构造函数的参数提供

property QueuePendingContext: boolean

read fQueuePendingContext;

{$endif USE_WINIOCP}

/// 对此线程池中定义的线程的低级访问

property WorkThread: TSynThreadPoolWorkThreads

read fWorkThread;

published

/// 线程池中可用的线程数

// - 映射Create()参数,即默认为32

property WorkThreadCount: integer

read fWorkThreadCount;

/// 当前在此线程池中处理任务的线程数

// - 范围在0..WorkThreadCount之间

property RunningThreads: integer

read fRunningThreads;

/// 由于线程池争用而被拒绝的任务数

// - 如果此数字很高,请考虑设置更高的线程数,或分析并调整Task方法

property ContentionAbortCount: cardinal

read fContentionAbortCount;

/// 由于争用而拒绝连接的延迟时间(毫秒)

// - 默认为5000,即等待IOCP或aQueuePendingContext内部列表中有空间可用5秒

// - 在此延迟期间,不接受新的连接(即不调用Accept),以便负载均衡器可以检测到争用并切换到池中的另一个实例,

// 或直接客户端最终可能会拒绝其连接,因此不会开始发送数据

property ContentionAbortDelay: integer

read fContentionAbortDelay write fContentionAbortDelay;

/// 等待队列中可用槽位的总时间(毫秒)

// - 争用不会立即失败,但会重试直到ContentionAbortDelay

// - 此处的高数值需要对Task方法进行代码重构

property ContentionTime: Int64

read fContentionTime;

/// 线程池等待队列中可用槽位的次数

// - 争用不会立即失败,但会重试直到ContentionAbortDelay

// - 此处的高数值可能需要增加线程数

// - 使用此属性和ContentionTime来计算平均争用时间

property ContentionCount: cardinal

read fContentionCount;

{$ifndef USE_WINIOCP}

/// 当前等待分配给线程的输入任务数

property PendingContextCount: integer

read GetPendingContextCount;

{$endif USE_WINIOCP}

end;

{$M-} // 关闭内存管理消息

const

// 允许TSynThreadPoolWorkThread堆栈使用最多256 * 2MB = 512MB的RAM

THREADPOOL_MAXTHREADS = 256;

由于 TSynThreadPool类是一个高度抽象且依赖于特定实现的类(如它可能使用Windows的I/O完成端口或Linux/POSIX的事件驱动机制),编写一个完整的例程代码可能会相当复杂,并且需要模拟或实际实现这些依赖项。然而,我可以提供一个简化的示例,该示例展示了如何创建 TSynThreadPool实例、如何向其推送任务,并如何大致实现 Task方法。

请注意,以下代码是一个高度简化的示例,并不包含所有 TSynThreadPool类定义中的功能,特别是与Windows I/O完成端口或Linux/POSIX事件驱动机制相关的部分。此外,由于 TSynThreadPool是一个假设的类(因为它不是Delphi标准库或广泛认可的第三方库的一部分),我将基于您提供的类定义来编写这个示例。

program TSynThreadPoolDemo;

{$MODE DELPHI}

uses

SysUtils, Classes; // 引入必要的单元

// 假设TSynThreadPool及其依赖项已经在某个单元中定义

// 这里我们使用一个占位符单元名YourThreadPoolUnit

uses YourThreadPoolUnit;

// 一个简单的任务上下文类(仅作为示例)

type

TMyTaskContext = record

Data: Integer;

end;

// TSynThreadPool的Task方法的实现类

type

TMyThreadPool = class(TSynThreadPool)

protected

procedure Task(aCaller: TSynThreadPoolWorkThread; aContext: Pointer); override;

end;

{ TMyThreadPool }

procedure TMyThreadPool.Task(aCaller: TSynThreadPoolWorkThread; aContext: Pointer);

var

Ctx: PMyTaskContext;

begin

Ctx := PMyTaskContext(aContext);

WriteLn('Processing task with data: ', Ctx.Data);

// 在这里添加处理任务的代码

// ...

end;

var

Pool: TMyThreadPool;

Ctx: TMyTaskContext;

I: Integer;

begin

try

// 创建一个线程池实例,假设我们想要使用4个工作线程

Pool := TMyThreadPool.Create(4);

try

// 模拟向线程池推送一些任务

for I := 1 to 10 do

begin

Ctx.Data := I;

if not Pool.Push(@Ctx) then

begin

// 在这个简化的示例中,我们不会处理Push返回false的情况

// 在实际应用中,您可能需要等待、重试或将任务放入另一个队列中

WriteLn('Failed to push task to the pool (this should not happen in this simplified example)');

end;

end;

// 在这个简化的示例中,我们没有等待所有任务完成

// 在实际应用中,您可能需要等待线程池中的所有任务都完成后再继续

// ...

finally

// 销毁线程池实例,这将释放所有关联的资源

Pool.Free;

end;

except

on E: Exception do

WriteLn('An error occurred: ', E.Message);

end;

// 保持控制台窗口打开,直到用户按任意键

WriteLn('Press Enter to exit...');

ReadLn;

end.

// 注意:由于TSynThreadPool是一个假设的类,并且上述代码没有实现所有细节(如线程池的实际工作线程管理、任务队列等),

// 因此这个示例主要是为了展示如何使用该类(如果它存在的话)的大致结构。

// 在实际应用中,您需要根据TSynThreadPool类的具体实现来调整此代码。

重要说明

  1. 占位符单元:在上述代码中,我使用了 YourThreadPoolUnit作为包含 TSynThreadPool类定义的占位符单元名。在实际应用中,您需要将其替换为包含该类定义的实际单元名。
  2. 任务上下文:我定义了一个简单的 TMyTaskContext记录类型来作为任务的上下文。在实际应用中,您可能需要根据需要定义更复杂的上下文类型。
  3. 错误处理:在 Push方法返回 false的情况下,上述代码仅打印了一条消息,并没有采取任何恢复措施。在实际应用中,您可能需要实现更复杂的错误处理逻辑(如重试、等待或将任务放入另一个队列中)。
  4. 等待任务完成:上述代码没有等待线程池中的所有任务都完成。在实际应用中,您可能需要实现某种形式的等待机制(例如,使用同步事件或计数器)来确保所有任务都已完成后再继续执行后续代码。
  5. 线程池实现:由于 TSynThreadPool是一个假设的类,并且其实现细节(如工作线程的管理、任务队列的实现等)并未在您的类定义中给出,因此上述代码仅提供了一个大致的框架。在实际应用中,您需要根据 TSynThreadPool类的具体实现来调整此代码。


声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。