diff --git a/docker/examples/benchmarks/benchmarks_llm_11k_prompt.txt b/docker/examples/benchmarks/benchmarks_llm_11k_prompt.txt new file mode 100644 index 000000000..25072fa18 --- /dev/null +++ b/docker/examples/benchmarks/benchmarks_llm_11k_prompt.txt @@ -0,0 +1,214 @@ +A chat between a curious user and an artificial intelligence assistant, who very familiar with database related knowledge. +The assistant gives helpful, detailed, professional and polite answers to the user's questions. 基于以下已知的信息, 专业、简要的回答用户的问题, + 如果无法从提供的内容中获取答案, 请说: "知识库中提供的内容不足以回答此问题" 禁止胡乱编造, 回答的时候最好按照1.2.3.点进行总结。 + 已知内容: + +OceanBase 数据库(OceanBase Database)是一款完全自研的企业级原生分布式数据库,在普通硬件上实现金融级高可用,首创“三地五中心”城市级故障自动无损容灾新标准,刷新 TPC-C 标准测试,单集群规模超过 1500 节点,具有云原生、强一致性、高度兼容 Oracle/MySQL 等特性。 + +核心特性 +高可用 +独创 “三地五中心” 容灾架构方案,建立金融行业无损容灾新标准。支持同城/异地容灾,可实现多地多活,满足金融行业 6 级容灾标准(RPO=0,RTO< 8s),数据零丢失。 +高兼容 +高度兼容 Oracle 和 MySQL,覆盖绝大多数常见功能,支持过程语言、触发器等高级特性,提供自动迁移工具,支持迁移评估和反向同步以保障数据迁移安全,可支撑金融、政府、运营商等关键行业核心场景替代。 +水平扩展 +实现透明水平扩展,支持业务快速的扩容缩容,同时通过准内存处理架构实现高性能。支持集群节点超过数千个,单集群最大数据量超过 3PB,最大单表行数达万亿级。 +低成本 +基于 LSM-Tree 的高压缩引擎,存储成本降低 70% - 90%;原生支持多租户架构,同集群可为多个独立业务提供服务,租户间数据隔离,降低部署和运维成本。 +实时 HTAP +基于“同一份数据,同一个引擎”,同时支持在线实时交易及实时分析两种场景,“一份数据”的多个副本可以存储成多种形态,用于不同工作负载,从根本上保持数据一致性。 +安全可靠 +12 年完全自主研发,代码级可控,自研单机分布式一体化架构,大规模金融核心场景 9 年可靠性验证;完备的角色权限管理体系,数据存储和通信全链路透明加密,支持国密算法,通过等保三级专项合规检测。 +深入了解 OceanBase 数据库 +您可以通过以下内容更深入地了解 OceanBase 数据库: + +OceanBase 使用通用服务器硬件,依赖本地存储,分布式部署使用的多个服务器也是对等的,没有特殊的硬件要求。OceanBase 的分布式数据库处理采用 Shared Nothing 架构,数据库内的 SQL 执行引擎具有分布式执行能力。 + +OceanBase 在服务器上会运行叫做 observer 的单进程程序作为数据库的运行实例,使用本地的文件存储数据和事务 Redo 日志。 + +OceanBase 集群部署需要配置可用区(Zone),由若干个服务器组成。可用区是一个逻辑概念,表示集群内具有相似硬件可用性的一组节点,它在不同的部署模式下代表不同的含义。例如,当整个集群部署在同一个数据中心(IDC)内的时候,一个可用区的节点可以属于同一个机架,同一个交换机等。当集群分布在多个数据中心的时候,每个可用区可以对应于一个数据中心。 + +用户存储的数据在分布式集群内部可以存储多个副本,用于故障容灾,也可以用于分散读取压力。在一个可用区内部数据只有一个副本,不同的可用区可以存储同一个数据的多个副本,副本之间由共识协议保证数据的一致性。 + +OceanBase 内置多租户特性,每个租户对于使用者是一个独立的数据库,一个租户能够在租户级别设置租户的分布式部署方式。租户之间 CPU、内存和 IO 都是隔离的。 + +OceanBase的数据库实例内部由不同的组件相互协作,这些组件从底层向上由存储层、复制层、均衡层、事务层、SQL 层、接入层组成。 + +存储层 +存储层以一张表或者一个分区为粒度提供数据存储与访问,每个分区对应一个用于存储数据的Tablet(分片),用户定义的非分区表也会对应一个 Tablet。 + +Tablet 的内部是分层存储的结构,总共有 4 层。DML 操作插入、更新、删除等首先写入 MemTable,等到 MemTable 达到一定大小时转储到磁盘成为 L0 SSTable。L0 SSTable 个数达到阈值后会将多个 L0 SSTable 合并成一个 L1 SSTable。在每天配置的业务低峰期,系统会将所有的 MemTable、L0 SSTable 和 L1 SSTable 合并成一个 Major SSTable。 + +每个 SSTable 内部是以 2MB 定长宏块为基本单位,每个宏块内部由多个不定长微块组成。 + +Major SSTable 的微块会在合并过程中用编码方式进行格式转换,微块内的数据会按照列维度分别进行列内的编码,编码规则包括字典/游程/常量/差值等,每一列压缩结束后,还会进一步对多列进行列间等值/子串等规则编码。编码能对数据大幅压缩,同时提炼的列内特征信息还能进一步加速后续的查询速度。 + +在编码压缩之后,还可以根据用户指定的通用压缩算法进行无损压缩,进一步提升数据压缩率。 + +复制层 +复制层使用日志流(LS、Log Stream)在多副本之间同步状态。每个 Tablet 都会对应一个确定的日志流,DML 操作写入 Tablet 的数据所产生的 Redo 日志会持久化在日志流中。日志流的多个副本会分布在不同的可用区中,多个副本之间维持了共识算法,选择其中一个副本作为主副本,其他的副本皆为从副本。Tablet 的 DML 和强一致性查询只在其对应的日志流的主副本上进行。 + +通常情况下,每个租户在每台机器上只会有一个日志流的主副本,可能存在多个其他日志流的从副本。租户的总日志流个数取决于 Primary Zone 和 Locality 的配置。 + +日志流使用自研的 Paxos 协议将 Redo 日志在本服务器持久化,同时通过网络发送给日志流的从副本,从副本在完成各自持久化后应答主副本,主副本在确认有多数派副本都持久化成功后确认对应的 Redo 日志持久化成功。从副本利用 Redo 日志的内容实时回放,保证自己的状态与主副本一致。 + +日志流的主副本在被选举成为主后会获得租约(Lease),正常工作的主副本在租约有效期内会不停的通过选举协议延长租约期。主副本只会在租约有效时执行主的工作,租约机制保证了数据库异常处理的能力。 + +复制层能够自动应对服务器故障,保障数据库服务的持续可用。如果出现少于半数的从副本所在服务器出现问题,因为还有多于半数的副本正常工作,数据库的服务不受影响。如果主副本所在服务器出现问题,其租约会得不到延续,待其租约失效后,其他从副本会通过选举协议选举出新的主副本并授予新的租约,之后即可恢复数据库的服务。 + +均衡层 +新建表和新增分区时,系统会按照均衡原则选择合适的日志流创建 Tablet。当租户的属性发生变更,新增了机器资源,或者经过长时间使用后,Tablet 在各台机器上不再均衡时,均衡层通过日志流的分裂和合并操作,并在这个过程中配合日志流副本的移动,让数据和服务在多个服务器之间再次均衡。 + +当租户有扩容操作,获得更多服务器资源时,均衡层会将租户内已有的日志流进行分裂,并选择合适数量的 Tablet 一同分裂到新的日志流中,再将新日志流迁移到新增的服务器上,以充分利用扩容后的资源。当租户有缩容操作时,均衡层会把需要缩减的服务器上的日志流迁移到其他服务器上,并和其他服务器上已有的日志流进行合并,以缩减机器的资源占用。 + +当数据库长期使用后,随着持续创建删除表格,并且写入更多的数据,即使没有服务器资源数量变化,原本均衡的情况可能被破坏。最常见的情况是,当用户删除了一批表格后,删除的表格可能原本聚集在某一些机器上,删除后这些机器上的 Tablet 数量就变少了,应该把其他机器的 Tablet 均衡一些到这些少的机器上。均衡层会定期生成均衡计划,将 Tablet 多的服务器上日志流分裂出临时日志流并携带需要移动的 Tablet,临时日志流迁移到目的服务器后再和目的服务器上的日志流进行合并,以达成均衡的效果。 + +事务层 +事务层保证了单个日志流和多个日志流DML操作提交的原子性,也保证了并发事务之间的多版本隔离能力。 + +原子性 +一个日志流上事务的修改,即使涉及多个 Tablet,通过日志流的 write-ahead log 可以保证事务提交的原子性。事务的修改涉及多个日志流时,每个日志流会产生并持久化各自的write-ahead log,事务层通过优化的两阶段提交协议来保证提交的原子性。 + +事务层会选择一个事务修改的一个日志流产生协调者状态机,协调者会与事务修改的所有日志流通信,判断 write-ahead log 是否持久化,当所有日志流都完成持久化后,事务进入提交状态,协调者会再驱动所有日志流写下这个事务的 Commit 日志,表示事务最终的提交状态。当从副本回放或者数据库重启时,已经完成提交的事务都会通过 Commit 日志确定各自日志流事务的状态。 + +宕机重启场景下,宕机前还未完成的事务,会出现写完 write-ahead log 但是还没有Commit 日志的情况,每个日志流的 write-ahead log 都会包含事务的所有日志流列表,通过此信息可以重新确定哪个日志流是协调者并恢复协调者的状态,再次推进两阶段状态机,直到事务最终的 Commit 或 Abort 状态。 + +隔离性 +GTS 服务是一个租户内产生连续增长的时间戳的服务,其通过多副本保证可用性,底层机制与上面复制层所描述的日志流副本同步机制是一样的。 + +每个事务在提交时会从 GTS 获取一个时间戳作为事务的提交版本号并持久化在日志流的write-ahead log 中,事务内所有修改的数据都以此提交版本号标记。 + +每个语句开始时(对于 Read Committed 隔离级别)或者每个事务开始时(对于Repeatable Read 和 Serializable 隔离级别)会从 GTS 获取一个时间戳作为语句或事务的读取版本号。在读取数据时,会跳过事务版本号比读取版本号大的数据,通过这种方式为读取操作提供了统一的全局数据快照。 + +SQL 层 +SQL 层将用户的 SQL 请求转化成对一个或多个 Tablet 的数据访问。 + +SQL 层组件 +SQL 层处理一个请求的执行流程是:Parser、Resolver、Transformer、Optimizer、Code Generator、Executor。 + +Parser 负责词法/语法解析,Parser 会将用户的 SQL 分成一个个的 "Token",并根据预先设定好的语法规则解析整个请求,转换成语法树(Syntax Tree)。 + +Resolver 负责语义解析,将根据数据库元信息将 SQL 请求中的 Token 翻译成对应的对象(例如库、表、列、索引等),生成的数据结构叫做 Statement Tree。 + +Transformer 负责逻辑改写,根据内部的规则或代价模型,将 SQL 改写为与之等价的其他形式,并将其提供给后续的优化器做进一步的优化。Transformer 的工作方式是在原Statement Tree 上做等价变换,变换的结果仍然是一棵 Statement Tree。 + +Optimizer(优化器)为 SQL 请求生成最佳的执行计划,需要综合考虑 SQL 请求的语义、对象数据特征、对象物理分布等多方面因素,解决访问路径选择、联接顺序选择、联接算法选择、分布式计划生成等问题,最终生成执行计划。 + +Code Generator(代码生成器)将执行计划转换为可执行的代码,但是不做任何优化选择。 + +Executor(执行器)启动 SQL 的执行过程。 + +在标准的 SQL 流程之外,SQL 层还有 Plan Cache 能力,将历史的执行计划缓存在内存中,后续的执行可以反复执行这个计划,避免了重复查询优化的过程。配合 Fast-parser 模块,仅使用词法分析对文本串直接参数化,获取参数化后的文本及常量参数,让 SQL 直接命中 Plan Cache,加速频繁执行的 SQL。 + +多种计划 +SQL 层的执行计划分为本地、远程和分布式三种。本地执行计划只访问本服务器的数据。远程执行计划只访问非本地的一台服务器的数据。分布式计划会访问超过一台服务器的数据,执行计划会分成多个子计划在多个服务器上执行。 + +SQL 层并行化执行能力可以将执行计划分解成多个部分,由多个执行线程执行,通过一定的调度的方式,实现执行计划的并行处理。并行化执行可以充分发挥服务器 CPU 和 IO 处理能力,缩短单个查询的响应时间。并行查询技术可以用于分布式执行计划,也可以用于本地执行计划。 + +接入层 +obproxy 是 OceanBase 数据库的接入层,负责将用户的请求转发到合适的 OceanBase 实例上进行处理。 + +obproxy 是独立的进程实例,独立于 OceanBase 的数据库实例部署。obproxy 监听网络端口,兼容 MySQL 网络协议,支持使用 MySQL 驱动的应用直接连接 OceanBase。 + +obproxy 能够自动发现 OceanBase 集群的数据分布信息,对于代理的每一条 SQL 语句,会尽可能识别出语句将访问的数据,并将语句直接转发到数据所在服务器的 OceanBase 实例。 + +obproxy 有两种部署方式,一种是部署在每一个需要访问数据库的应用服务器上,另一种是部署在与 OceanBase 相同的机器上。第一种部署方式下,应用程序直接连接部署在同一台服务器上的 obproxy,所有的请求会由 obproxy 发送到合适的 OceanBase 服务器。第二种部署方式下,需要使用网络负载均衡服务将多个 obproxy 聚合成同一个对应用提供服务的入口地址。 + +OceanBase 数据库采用 Shared-Nothing 架构,各个节点之间完全对等,每个节点都有自己的 SQL 引擎、存储引擎、事务引擎,运行在普通 PC 服务器组成的集群之上,具备高可扩展性、高可用性、高性能、低成本、与主流数据库高兼容等核心特性。 + +OceanBase 数据库的一个集群由若干个节点组成。这些节点分属于若干个可用区(Zone),每个节点属于一个可用区。可用区是一个逻辑概念,表示集群内具有相似硬件可用性的一组节点,它在不同的部署模式下代表不同的含义。例如,当整个集群部署在同一个数据中心(IDC)内的时候,一个可用区的节点可以属于同一个机架,同一个交换机等。当集群分布在多个数据中心的时候,每个可用区可以对应于一个数据中心。每个可用区具有 IDC 和地域(Region)两个属性,描述该可用区所在的 IDC 及 IDC 所属的地域。一般地,地域指 IDC 所在的城市。可用区的 IDC 和 Region 属性需要反映部署时候的实际情况,以便集群内的自动容灾处理和优化策略能更好地工作。根据业务对数据库系统不同的高可用性需求,OceanBase 集群提供了多种部署模式,参见 高可用架构概述。 + +在 OceanBase 数据库中,一个表的数据可以按照某种划分规则水平拆分为多个分片,每个分片叫做一个表分区,简称分区(Partition)。某行数据属于且只属于一个分区。分区的规则由用户在建表的时候指定,包括hash、range、list等类型的分区,还支持二级分区。例如,交易库中的订单表,可以先按照用户 ID 划分为若干一级分区,再按照月份把每个一级分区划分为若干二级分区。对于二级分区表,第二级的每个子分区是一个物理分区,而第一级分区只是逻辑概念。一个表的若干个分区可以分布在一个可用区内的多个节点上。每个物理分区有一个用于存储数据的存储层对象,叫做 Tablet ,用于存储有序的数据记录。 + +当用户对 Tablet 中记录进行修改的时候,为了保证数据持久化,需要记录重做日志(REDO)到 Tablet 对应的日志流(Log Stream)里。每个日志流服务了其所在节点上的多个 Tablet。为了能够保护数据,并在节点发生故障的时候不中断服务,每个日志流及其所属的 Tablet 有多个副本。一般来说,多个副本分散在多个不同的可用区里。多个副本中有且只有一个副本接受修改操作,叫做主副本(Leader),其他副本叫做从副本(Follower)。主从副本之间通过基于 Multi-Paxos 的分布式共识协议实现了副本之间数据的一致性。当主副本所在节点发生故障的时候,一个从副本会被选举为新的主副本并继续提供服务。 + +在集群的每个节点上会运行一个叫做 observer 的服务进程,它内部包含多个操作系统线程。节点的功能都是对等的。每个服务负责自己所在节点上分区数据的存取,也负责路由到本机的 SQL 语句的解析和执行。这些服务进程之间通过 TCP/IP 协议进行通信。同时,每个服务会监听来自外部应用的连接请求,建立连接和数据库会话,并提供数据库服务。关于 observer 服务进程的更多信息,参见 线程简介。 + +为了简化大规模部署多个业务数据库的管理并降低资源成本,OceanBase 数据库提供了独特的多租户特性。在一个 OceanBase 集群内,可以创建很多个互相之间隔离的数据库"实例",叫做一个租户。从应用程序的视角来看,每个租户是一个独立的数据库。不仅如此,每个租户可以选择 MySQL 或 Oracle 兼容模式。应用连接到 MySQL 租户后,可以在租户下创建用户、database,与一个独立的 MySQL 库的使用体验是一样的。同样的,应用连接到 Oracle 租户后,可以在租户下创建 schema、管理角色等,与一个独立的 Oracle 库的使用体验是一样的。一个新的集群初始化之后,就会存在一个特殊的名为 sys 的租户,叫做系统租户。系统租户中保存了集群的元数据,是一个 MySQL 兼容模式的租户。 + +为了隔离租户的资源,每个 observer 进程内可以有多个属于不同租户的虚拟容器,叫做资源单元(UNIT)。每个租户在多个节点上的资源单元组成一个资源池。资源单元包括 CPU 和内存资源。 + +为了使 OceanBase 数据库对应用程序屏蔽内部分区和副本分布等细节,使应用访问分布式数据库像访问单机数据库一样简单,我们提供了 obproxy 代理服务。应用程序并不会直接与 OBServer 建立连接,而是连接obproxy,然后由 obproxy 转发 SQL 请求到合适的 OBServer 节点。obproxy 是无状态的服务,多个 obproxy 节点通过网络负载均衡(SLB)对应用提供统一的网络地址。 + + +OceanBase 数据库是随着阿里巴巴电商业务的发展孕育而生,随着蚂蚁集团移动支付业务的发展而壮大,经过十多年各类业务的使用和打磨才终于破茧成蝶,推向了外部市场。本章节简述 OceanBase 数据库发展过程中一些里程碑意义的事件。 + +诞生 + +2010 年,OceanBase 创始人阳振坤博士带领初创团队启动了 OceanBase 项目。第一个应用是淘宝的收藏夹业务。如今收藏夹依然是 OceanBase 的客户。收藏夹单表数据量非常大,OceanBase 用独创的方法解决了其高并发的大表连接小表的需求。 + +关系数据库 + +早期的版本中,应用通过定制的 API 库访问 OceanBase 数据库。2012 年,OceanBase 数据库发布了支持 SQL 的版本,初步成为一个功能完整的通用关系数据库。 + +初试金融业务 + +OceanBase 进入支付宝(后来的蚂蚁集团),开始应用于金融级的业务场景。2014 年"双 11"大促活动,OceanBase 开始承担交易库部分流量。此后,新成立的网商银行把所有核心交易库都运行在 OceanBase 数据库上。 + +金融级核心库 + +2016 年,OceanBase 数据库发布了架构重新设计后的 1.0 版本,支持了分布式事务,提升了高并发写业务中的扩展,同时实现了多租户架构,这个整体架构延续至今。同时,到 2016 年"双 11"时,支付宝全部核心库的业务流量 100% 运行在 OceanBase 数据库上,包括交易、支付、会员和最重要的账务库。 + +走向外部市场 + +2017 年,OceanBase 数据库开始试点外部业务,成功应用于南京银行。 + +商业化加速 + +2018 年,OceanBase 数据库发布 2.0 版本,开始支持 Oracle 兼容模式。这一特性降低应用改造适配成本,在外部客户中快速推广开来。 + +勇攀高峰 + +2019 年,OceanBase 数据库 V2.2 版本参加代表 OLTP 数据库最权威的 TPC-C 评测,以 6000 万 tpmC 的成绩登顶世界第一。随后,在 2020 年,又以 7 亿 tpmC 刷新纪录,截止目前依然稳居第一。这充分证明了 OceanBase 数据库优秀的扩展性和稳定性。OceanBase 数据库是第一个也是截止目前唯一一个上榜 TPC-C 的中国数据库产品。 + +HTAP 混合负载 + +2021 年,OceanBase 数据库 V3.0 基于全新的向量化执行引擎,在 TPC-H 30000GB 的评测中以 1526 万 QphH 的成绩刷新了评测榜单。这标志着 OceanBase 数据库一套引擎处理 AP 和 TP 混合负载的能力取得了基础性的突破。 + +开源开放 + +2021 年六一儿童节,OceanBase 数据库宣布全面开源,开放合作,共建生态。 + +OceanBase 数据库采用了单集群多租户设计,天然支持云数据库架构,支持公有云、私有云、混合云等多种部署形式。 + +架构 + +OceanBase 数据库通过租户实现资源隔离,让每个数据库服务的实例不感知其他实例的存在,并通过权限控制确保租户数据的安全性,配合 OceanBase 数据库强大的可扩展性,能够提供安全、灵活的 DBaaS 服务。 + +租户是一个逻辑概念。在 OceanBase 数据库中,租户是资源分配的单位,是数据库对象管理和资源管理的基础,对于系统运维,尤其是对于云数据库的运维有着重要的影响。租户在一定程度上相当于传统数据库的"实例"概念。租户之间是完全隔离的。在数据安全方面,OceanBase 数据库不允许跨租户的数据访问,以确保用户的数据资产没有被其他租户窃取的风险。在资源使用方面,OceanBase 数据库表现为租户"独占"其资源配额。总体上来说,租户(tenant)既是各类数据库对象的容器,又是资源(CPU、Memory、IO 等)的容器。 + +OceanBase 数据库在一个系统中可同时支持 MySQL 模式和 Oracle 模式两种模式的租户。用户在创建租户时,可选择创建 MySQL 兼容模式的租户或 Oracle 兼容模式的租户,租户的兼容模式一经确定就无法更改,所有数据类型、SQL 功能、视图等相应地与 MySQL 数据库或 Oracle 数据库保持一致。 + + +MySQL 模式 +MySQL 模式是为降低 MySQL 数据库迁移至 OceanBase 数据库所引发的业务系统改造成本,同时使业务数据库设计人员、开发人员、数据库管理员等可复用积累的 MySQL 数据库技术知识经验,并能快速上手 OceanBase 数据库而支持的一种租户类型功能。OceanBase 数据库的 MySQL 模式兼容 MySQL 5.7 的绝大部分功能和语法,兼容 MySQL 5.7 版本的全量以及 8.0 版本的部分 JSON 函数,基于 MySQL 的应用能够平滑迁移。 + +Oracle 模式 +OceanBase 数据库从 V2.x.x 版本开始支持 Oracle 兼容模式。Oracle 模式是为降低 Oracle 数据库迁移 OceanBase 数据库的业务系统改造成本,同时使业务数据库设计开发人员、数据库管理员等可复用积累的 Oracle 数据库技术知识经验,并能快速上手 OceanBase 数据库而支持的一种租户类型功能。Oracle 模式目前能够支持绝大部分的 Oracle 语法和过程性语言功能,可以做到大部分的 Oracle 业务进行少量修改后的自动迁移。 + +OceanBase 数据库是多租户架构。在 V4.0.0 版本之前,仅支持两种类型的租户:系统租户和用户租户。从 V4.0.0 版本开始,引入了 Meta 租户概念。因此,当前版本对用户可见的租户有三种类型:系统租户、用户租户以及 Meta 租户。 + +系统租户 +系统租户是集群默认创建的租户,与集群的生命周期一致,负责管理集群和所有租户的生命周期。系统租户仅有一个 1 号日志流,仅支持单点写入,不具备扩展能力。 + +系统租户可以创建用户表,所有的用户表和系统表数据均由 1 号日志流服务。系统租户的数据是集群私有的,不支持主备集群物理同步和物理备份恢复。 + +用户租户 +用户租户是由用户创建的租户,对外提供完整的数据库功能,支持 MySQL 和 Oracle 两种兼容模式。用户租户支持服务能力水平扩展到多台机器上,支持动态扩容和缩容,内部会根据用户的配置自动创建和删除日志流。 + +用户租户的数据有更强的数据保护和可用性要求,支持跨集群物理同步和物理备份恢复,典型数据包括:Schema 数据、用户表数据及事务数据等。 +Meta 租户 +Meta 租户是 OceanBase 数据库内部自管理的租户,每创建一个用户租户系统就会自动创建一个对应的 Meta 租户,其生命期与用户租户保持一致。 + +Meta 租户用于存储和管理用户租户的集群私有数据,这部分数据不需要进行跨库物理同步以及物理备份恢复,这些数据包括:配置项、位置信息、副本信息、日志流状态、备份恢复相关信息、合并信息等。 + +租户对比 +从用户角度来看,系统租户、用户租户和 Meta 租户的差异性如下表所示。 +OceanBase 数据库是多租户的数据库系统,一个集群内可包含多个相互独立的租户,每个租户提供独立的数据库服务。在 OceanBase 数据库中,使用资源配置(unit_config)、资源池(Resource Pool)和资源单元(Unit)三个概念,对各租户的可用资源进行管理。 + + +创建租户前,需首先确定租户的资源配置、使用资源范围等。租户创建的通用流程如下: + +资源配置是描述资源池的配置信息,用来描述资源池中每个资源单元可用的 CPU、内存、存储空间和 IOPS 等的规格。修改资源配置可动态调整资源单元的规格。这里需要注意,资源配置指定的是对应资源单元能够提供的服务能力,而不是资源单元的实时负载。 创建资源配置的示例语句如下: + +问题: +请你基于上述内容对 OceanBase 的介绍进行总结,不少于2000字。 \ No newline at end of file diff --git a/pilot/configs/model_config.py b/pilot/configs/model_config.py index cedfa8554..fec343f2a 100644 --- a/pilot/configs/model_config.py +++ b/pilot/configs/model_config.py @@ -2,6 +2,7 @@ # -*- coding:utf-8 -*- import os +from functools import cache ROOT_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) MODEL_PATH = os.path.join(ROOT_PATH, "models") @@ -22,6 +23,7 @@ os.chdir(new_directory) +@cache def get_device() -> str: try: import torch diff --git a/pilot/model/base.py b/pilot/model/base.py index 48480b94b..d54ac6d57 100644 --- a/pilot/model/base.py +++ b/pilot/model/base.py @@ -3,7 +3,9 @@ from enum import Enum from typing import TypedDict, Optional, Dict, List, Any + from dataclasses import dataclass, asdict +import time from datetime import datetime from pilot.utils.parameter_utils import ParameterDescription @@ -47,6 +49,79 @@ class WorkerApplyType(str, Enum): UPDATE_PARAMS = "update_params" +@dataclass +class ModelInferenceMetrics: + """A class to represent metrics for assessing the inference performance of a LLM.""" + + start_time_ms: Optional[int] = None + """The timestamp (in milliseconds) when the model inference starts.""" + + end_time_ms: Optional[int] = None + """The timestamp (in milliseconds) when the model inference ends.""" + + current_time_ms: Optional[int] = None + """The current timestamp (in milliseconds) when the model inference return partially output(stream).""" + + first_token_time_ms: Optional[int] = None + """The timestamp (in milliseconds) when the first token is generated.""" + + first_completion_time_ms: Optional[int] = None + """The timestamp (in milliseconds) when the first completion is generated.""" + + first_completion_tokens: Optional[int] = None + """The number of tokens when the first completion is generated.""" + + prompt_tokens: Optional[int] = None + """The number of tokens in the input prompt.""" + + completion_tokens: Optional[int] = None + """The number of tokens in the generated completion.""" + + total_tokens: Optional[int] = None + """The total number of tokens (prompt plus completion).""" + + speed_per_second: Optional[float] = None + """The average number of tokens generated per second.""" + + @staticmethod + def create_metrics( + last_metrics: Optional["ModelInferenceMetrics"] = None, + ) -> "ModelInferenceMetrics": + start_time_ms = last_metrics.start_time_ms if last_metrics else None + first_token_time_ms = last_metrics.first_token_time_ms if last_metrics else None + first_completion_time_ms = ( + last_metrics.first_completion_time_ms if last_metrics else None + ) + first_completion_tokens = ( + last_metrics.first_completion_tokens if last_metrics else None + ) + prompt_tokens = last_metrics.prompt_tokens if last_metrics else None + completion_tokens = last_metrics.completion_tokens if last_metrics else None + total_tokens = last_metrics.total_tokens if last_metrics else None + speed_per_second = last_metrics.speed_per_second if last_metrics else None + + if not start_time_ms: + start_time_ms = time.time_ns() // 1_000_000 + current_time_ms = time.time_ns() // 1_000_000 + end_time_ms = current_time_ms + + return ModelInferenceMetrics( + start_time_ms=start_time_ms, + end_time_ms=end_time_ms, + current_time_ms=current_time_ms, + first_token_time_ms=first_token_time_ms, + first_completion_time_ms=first_completion_time_ms, + first_completion_tokens=first_completion_tokens, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + speed_per_second=speed_per_second, + ) + + def to_dict(self) -> Dict: + return asdict(self) + + @dataclass class ModelOutput: text: str @@ -54,6 +129,9 @@ class ModelOutput: model_context: Dict = None finish_reason: str = None usage: Dict[str, Any] = None + metrics: Optional[ModelInferenceMetrics] = None + + """Some metrics for model inference""" def to_dict(self) -> Dict: return asdict(self) diff --git a/pilot/model/cluster/base.py b/pilot/model/cluster/base.py index 36c4779b8..45e46ab3e 100644 --- a/pilot/model/cluster/base.py +++ b/pilot/model/cluster/base.py @@ -21,6 +21,8 @@ class PromptRequest(BaseModel): context_len: int = None echo: bool = True span_id: str = None + metrics: bool = False + """Whether to return metrics of inference""" class EmbeddingsRequest(BaseModel): diff --git a/pilot/model/cluster/worker/default_worker.py b/pilot/model/cluster/worker/default_worker.py index d6663fc9f..c798e3075 100644 --- a/pilot/model/cluster/worker/default_worker.py +++ b/pilot/model/cluster/worker/default_worker.py @@ -1,10 +1,13 @@ import os import logging + from typing import Dict, Iterator, List, Optional +import time +import traceback from pilot.configs.model_config import get_device from pilot.model.model_adapter import get_llm_model_adapter, LLMModelAdaper -from pilot.model.base import ModelOutput +from pilot.model.base import ModelOutput, ModelInferenceMetrics from pilot.model.loader import ModelLoader, _get_model_real_path from pilot.model.parameter import ModelParameters from pilot.model.cluster.worker_base import ModelWorker @@ -144,14 +147,29 @@ def generate_stream(self, params: Dict) -> Iterator[ModelOutput]: ) previous_response = "" + last_metrics = ModelInferenceMetrics.create_metrics() + is_first_generate = True + context_len = params.get("context_len") or self.context_len for output in generate_stream_func( self.model, self.tokenizer, params, get_device(), context_len ): - model_output, incremental_output, output_str = self._handle_output( - output, previous_response, model_context + ( + model_output, + incremental_output, + output_str, + current_metrics, + ) = self._handle_output( + output, + previous_response, + model_context, + last_metrics, + is_first_generate, ) + if is_first_generate: + is_first_generate = False previous_response = output_str + last_metrics = current_metrics yield model_output print( f"\n\nfull stream output:\n{previous_response}\n\nmodel generate_stream params:\n{params}" @@ -191,13 +209,28 @@ async def async_generate_stream(self, params: Dict) -> Iterator[ModelOutput]: previous_response = "" context_len = params.get("context_len") or self.context_len + last_metrics = ModelInferenceMetrics.create_metrics() + is_first_generate = True async for output in generate_stream_func( self.model, self.tokenizer, params, get_device(), context_len ): - model_output, incremental_output, output_str = self._handle_output( - output, previous_response, model_context + ( + model_output, + incremental_output, + output_str, + current_metrics, + ) = self._handle_output( + output, + previous_response, + model_context, + last_metrics, + is_first_generate, ) + if is_first_generate: + is_first_generate = False + previous_response = output_str + last_metrics = current_metrics yield model_output print( f"\n\nfull stream output:\n{previous_response}\n\nmodel generate_stream params:\n{params}" @@ -262,7 +295,14 @@ def _prepare_generate_stream(self, params: Dict, span_operation_name: str): return params, model_context, generate_stream_func, model_span - def _handle_output(self, output, previous_response, model_context): + def _handle_output( + self, + output, + previous_response, + model_context, + last_metrics: ModelInferenceMetrics, + is_first_generate: bool, + ): finish_reason = None usage = None if isinstance(output, dict): @@ -273,14 +313,17 @@ def _handle_output(self, output, previous_response, model_context): logger.info(f"finish_reason: {finish_reason}") incremental_output = output[len(previous_response) :] print(incremental_output, end="", flush=True) + + metrics = _new_metrics_from_model_output(last_metrics, is_first_generate, usage) model_output = ModelOutput( text=output, error_code=0, model_context=model_context, finish_reason=finish_reason, usage=usage, + metrics=metrics, ) - return model_output, incremental_output, output + return model_output, incremental_output, output, metrics def _handle_exception(self, e): # Check if the exception is a torch.cuda.CudaError and if torch was imported. @@ -289,6 +332,8 @@ def _handle_exception(self, e): text="**GPU OutOfMemory, Please Refresh.**", error_code=1 ) else: + msg = traceback.format_exc() + logger.error(f"Model inference error, detail: {msg}") model_output = ModelOutput( text=f"**LLMServer Generate Error, Please CheckErrorInfo.**: {e}", error_code=1, @@ -310,3 +355,49 @@ def _parse_model_max_length(model, tokenizer) -> Optional[int]: return model_config.max_position_embeddings except Exception: return None + + +def _new_metrics_from_model_output( + last_metric: ModelInferenceMetrics, + is_first_generate: bool, + usage: Optional[Dict] = None, +) -> ModelInferenceMetrics: + metrics = ModelInferenceMetrics.create_metrics(last_metric) + if is_first_generate: + logger.info(f"is_first_generate, usage: {usage}") + metrics.first_completion_time_ms = time.time_ns() // 1_000_000 + + if not usage or not isinstance(usage, dict): + return metrics + prompt_tokens = usage.get("prompt_tokens") + completion_tokens = usage.get("completion_tokens") + total_tokens = usage.get("total_tokens") + + if prompt_tokens is None: + prompt_tokens = metrics.prompt_tokens + if completion_tokens is None: + completion_tokens = metrics.completion_tokens + if total_tokens is None: + total_tokens = metrics.total_tokens + + if is_first_generate and (completion_tokens is not None): + # completion_tokens == 0 is prefill + metrics.first_completion_tokens = completion_tokens + if completion_tokens == 1: + metrics.first_token_time_ms = metrics.first_completion_time_ms + + if prompt_tokens: + metrics.prompt_tokens = prompt_tokens + if completion_tokens: + metrics.completion_tokens = completion_tokens + if total_tokens: + metrics.total_tokens = total_tokens + elif prompt_tokens and completion_tokens: + total_tokens = prompt_tokens + completion_tokens + metrics.total_tokens = total_tokens + + if total_tokens: + # time cost(seconds) + duration = (metrics.current_time_ms - metrics.start_time_ms) / 1000.0 + metrics.speed_per_second = total_tokens / duration + return metrics diff --git a/pilot/model/cluster/worker/manager.py b/pilot/model/cluster/worker/manager.py index 2dd402920..9c1c4f7b3 100644 --- a/pilot/model/cluster/worker/manager.py +++ b/pilot/model/cluster/worker/manager.py @@ -1000,11 +1000,16 @@ def run_worker_manager( embedding_model_name: str = None, embedding_model_path: str = None, start_listener: Callable[["WorkerManager"], None] = None, + **kwargs, ): global worker_manager worker_params: ModelWorkerParameters = _parse_worker_params( - model_name=model_name, model_path=model_path, standalone=standalone, port=port + model_name=model_name, + model_path=model_path, + standalone=standalone, + port=port, + **kwargs, ) setup_logging( diff --git a/pilot/model/llm_out/vllm_llm.py b/pilot/model/llm_out/vllm_llm.py index 07d43dc74..de108c87c 100644 --- a/pilot/model/llm_out/vllm_llm.py +++ b/pilot/model/llm_out/vllm_llm.py @@ -1,9 +1,13 @@ from typing import Dict +import os from vllm import AsyncLLMEngine from vllm.utils import random_uuid from vllm.sampling_params import SamplingParams +_IS_BENCHMARK = os.getenv("DB_GPT_MODEL_BENCHMARK", "False").lower() == "true" + + async def generate_stream( model: AsyncLLMEngine, tokenizer, params: Dict, device: str, context_len: int ): @@ -37,15 +41,29 @@ async def generate_stream( top_p = max(top_p, 1e-5) if temperature <= 1e-5: top_p = 1.0 + gen_params = { + "stop": list(stop), + "ignore_eos": False, + } + prompt_token_ids = None + if _IS_BENCHMARK: + gen_params["stop"] = [] + gen_params["ignore_eos"] = True + prompt_len = context_len - max_new_tokens - 2 + prompt_token_ids = tokenizer([prompt]).input_ids[0] + prompt_token_ids = prompt_token_ids[-prompt_len:] sampling_params = SamplingParams( n=1, temperature=temperature, top_p=top_p, use_beam_search=False, - stop=list(stop), max_tokens=max_new_tokens, + **gen_params + ) + + results_generator = model.generate( + prompt, sampling_params, request_id, prompt_token_ids=prompt_token_ids ) - results_generator = model.generate(prompt, sampling_params, request_id) async for request_output in results_generator: prompt = request_output.prompt if echo: @@ -53,4 +71,25 @@ async def generate_stream( else: text_outputs = [output.text for output in request_output.outputs] text_outputs = " ".join(text_outputs) - yield {"text": text_outputs, "error_code": 0, "usage": {}} + + # Note: usage is not supported yet + prompt_tokens = len(request_output.prompt_token_ids) + completion_tokens = sum( + len(output.token_ids) for output in request_output.outputs + ) + usage = { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": prompt_tokens + completion_tokens, + } + finish_reason = ( + request_output.outputs[0].finish_reason + if len(request_output.outputs) == 1 + else [output.finish_reason for output in request_output.outputs] + ) + yield { + "text": text_outputs, + "error_code": 0, + "usage": usage, + "finish_reason": finish_reason, + } diff --git a/pilot/model/model_adapter.py b/pilot/model/model_adapter.py index 8fd242882..5f328c554 100644 --- a/pilot/model/model_adapter.py +++ b/pilot/model/model_adapter.py @@ -39,7 +39,7 @@ logger = logging.getLogger(__name__) thread_local = threading.local() - +_IS_BENCHMARK = os.getenv("DB_GPT_MODEL_BENCHMARK", "False").lower() == "true" _OLD_MODELS = [ "llama-cpp", @@ -228,9 +228,16 @@ def load(self, model_path: str, from_pretrained_kwargs: dict): return self._adapter.load_model(model_path, from_pretrained_kwargs) def get_generate_stream_function(self, model: "TorchNNModule", model_path: str): - from fastchat.model.model_adapter import get_generate_stream_function + if _IS_BENCHMARK: + from pilot.utils.benchmarks.llm.fastchat_benchmarks_inference import ( + generate_stream, + ) + + return generate_stream + else: + from fastchat.model.model_adapter import get_generate_stream_function - return get_generate_stream_function(model, model_path) + return get_generate_stream_function(model, model_path) def get_default_conv_template( self, model_name: str, model_path: str diff --git a/pilot/scene/base_chat.py b/pilot/scene/base_chat.py index 9a19f3255..7a8a37cae 100644 --- a/pilot/scene/base_chat.py +++ b/pilot/scene/base_chat.py @@ -635,7 +635,7 @@ def _build_model_operator( model_task_name="llm_model_node", cache_task_name="llm_model_cache_node", ) - # Create a join node to merge outputs from the model and cache nodes, just keep the fist not empty output + # Create a join node to merge outputs from the model and cache nodes, just keep the first not empty output join_node = JoinOperator( combine_function=lambda model_out, cache_out: cache_out or model_out ) diff --git a/pilot/utils/benchmarks/__init__.py b/pilot/utils/benchmarks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pilot/utils/benchmarks/llm/__init__.py b/pilot/utils/benchmarks/llm/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pilot/utils/benchmarks/llm/fastchat_benchmarks_inference.py b/pilot/utils/benchmarks/llm/fastchat_benchmarks_inference.py new file mode 100644 index 000000000..cb05ab33f --- /dev/null +++ b/pilot/utils/benchmarks/llm/fastchat_benchmarks_inference.py @@ -0,0 +1,296 @@ +""" +Adapted from fastchat: https://github.com/lm-sys/FastChat/blob/main/fastchat/serve/inference.py. +For benchmarks. + +""" +import gc +from typing import Iterable, Dict + +import torch +from transformers.generation.logits_process import ( + LogitsProcessorList, + RepetitionPenaltyLogitsProcessor, + TemperatureLogitsWarper, + TopKLogitsWarper, + TopPLogitsWarper, +) + + +from fastchat.utils import is_partial_stop, is_sentence_complete, get_context_length + + +def prepare_logits_processor( + temperature: float, repetition_penalty: float, top_p: float, top_k: int +) -> LogitsProcessorList: + processor_list = LogitsProcessorList() + # TemperatureLogitsWarper doesn't accept 0.0, 1.0 makes it a no-op so we skip two cases. + if temperature >= 1e-5 and temperature != 1.0: + processor_list.append(TemperatureLogitsWarper(temperature)) + if repetition_penalty > 1.0: + processor_list.append(RepetitionPenaltyLogitsProcessor(repetition_penalty)) + if 1e-8 <= top_p < 1.0: + processor_list.append(TopPLogitsWarper(top_p)) + if top_k > 0: + processor_list.append(TopKLogitsWarper(top_k)) + return processor_list + + +@torch.inference_mode() +def generate_stream( + model, + tokenizer, + params: Dict, + device: str, + context_len: int, + stream_interval: int = 2, + judge_sent_end: bool = False, +): + if hasattr(model, "device"): + device = model.device + + # Read parameters + prompt = params["prompt"] + len_prompt = len(prompt) + temperature = float(params.get("temperature", 1.0)) + repetition_penalty = float(params.get("repetition_penalty", 1.0)) + top_p = float(params.get("top_p", 1.0)) + top_k = int(params.get("top_k", -1)) # -1 means disable + max_new_tokens = int(params.get("max_new_tokens", 256)) + logprobs = params.get("logprobs", None) # FIXME: Support logprobs>1. + echo = bool(params.get("echo", True)) + stop_str = params.get("stop", None) + stop_token_ids = params.get("stop_token_ids", None) or [] + if tokenizer.eos_token_id not in stop_token_ids: + stop_token_ids.append(tokenizer.eos_token_id) + + logits_processor = prepare_logits_processor( + temperature, repetition_penalty, top_p, top_k + ) + input_ids = tokenizer(prompt).input_ids + + if model.config.is_encoder_decoder: + max_src_len = context_len + else: # truncate + max_src_len = context_len - max_new_tokens - 1 + + input_ids = input_ids[-max_src_len:] + output_ids = list(input_ids) + input_echo_len = len(input_ids) + + # Don't stop generate until max_new_tokens is reached. + stop_token_ids = [] + stop_str = None + + if model.config.is_encoder_decoder: + if logprobs is not None: # FIXME: Support logprobs for encoder-decoder models. + raise NotImplementedError + encoder_output = model.encoder( + input_ids=torch.as_tensor([input_ids], device=device) + )[0] + start_ids = torch.as_tensor( + [[model.generation_config.decoder_start_token_id]], + dtype=torch.int64, + device=device, + ) + else: + start_ids = torch.as_tensor([input_ids], device=device) + + past_key_values = out = None + token_logprobs = [None] # The first token has no logprobs. + sent_interrupt = False + finish_reason = None + for i in range(max_new_tokens): + if i == 0: # prefill + if model.config.is_encoder_decoder: + out = model.decoder( + input_ids=start_ids, + encoder_hidden_states=encoder_output, + use_cache=True, + ) + logits = model.lm_head(out[0]) + else: + out = model(input_ids=start_ids, use_cache=True) + logits = out.logits + past_key_values = out.past_key_values + + if logprobs is not None: + # Prefull logprobs for the prompt. + shift_input_ids = start_ids[..., 1:].contiguous() + shift_logits = logits[..., :-1, :].contiguous() + shift_logits = torch.log_softmax(shift_logits, dim=-1).tolist() + for label_id, logit in zip( + shift_input_ids[0].tolist(), shift_logits[0] + ): + token_logprobs.append(logit[label_id]) + else: # decoding + if model.config.is_encoder_decoder: + out = model.decoder( + input_ids=torch.as_tensor( + [[token] if not sent_interrupt else output_ids], + device=device, + ), + encoder_hidden_states=encoder_output, + use_cache=True, + past_key_values=past_key_values if not sent_interrupt else None, + ) + sent_interrupt = False + + logits = model.lm_head(out[0]) + else: + out = model( + input_ids=torch.as_tensor( + [[token] if not sent_interrupt else output_ids], + device=device, + ), + use_cache=True, + past_key_values=past_key_values if not sent_interrupt else None, + ) + sent_interrupt = False + logits = out.logits + past_key_values = out.past_key_values + + if logits_processor: + if repetition_penalty > 1.0: + tmp_output_ids = torch.as_tensor([output_ids], device=logits.device) + else: + tmp_output_ids = None + last_token_logits = logits_processor(tmp_output_ids, logits[:, -1, :])[0] + else: + last_token_logits = logits[0, -1, :] + + if device == "mps": + # Switch to CPU by avoiding some bugs in mps backend. + last_token_logits = last_token_logits.float().to("cpu") + + if temperature < 1e-5 or top_p < 1e-8: # greedy + _, indices = torch.topk(last_token_logits, 2) + tokens = [int(index) for index in indices.tolist()] + else: + probs = torch.softmax(last_token_logits, dim=-1) + indices = torch.multinomial(probs, num_samples=2) + tokens = [int(token) for token in indices.tolist()] + token = tokens[0] + output_ids.append(token) + if logprobs is not None: + # Cannot use last_token_logits because logprobs is based on raw logits. + token_logprobs.append( + torch.log_softmax(logits[0, -1, :], dim=-1)[token].tolist() + ) + + if token in stop_token_ids: + stopped = True + else: + stopped = False + + # Yield the output tokens + if i % stream_interval == 0 or i == max_new_tokens - 1 or stopped: + if echo: + tmp_output_ids = output_ids + rfind_start = len_prompt + else: + tmp_output_ids = output_ids[input_echo_len:] + rfind_start = 0 + + output = tokenizer.decode( + tmp_output_ids, + skip_special_tokens=True, + spaces_between_special_tokens=False, + clean_up_tokenization_spaces=True, + ) + ret_logprobs = None + if logprobs is not None: + ret_logprobs = { + "text_offset": [], + "tokens": [ + tokenizer.decode(token) + for token in ( + output_ids if echo else output_ids[input_echo_len:] + ) + ], + "token_logprobs": token_logprobs + if echo + else token_logprobs[input_echo_len:], + "top_logprobs": [{}] + * len(token_logprobs if echo else token_logprobs[input_echo_len:]), + } + # Compute text_offset + curr_pos = 0 + for text in ret_logprobs["tokens"]: + ret_logprobs["text_offset"].append(curr_pos) + curr_pos += len(text) + + # TODO: For the issue of incomplete sentences interrupting output, apply a patch and others can also modify it to a more elegant way + if judge_sent_end and stopped and not is_sentence_complete(output): + if len(tokens) > 1: + token = tokens[1] + output_ids[-1] = token + else: + output_ids.pop() + stopped = False + sent_interrupt = True + + partially_stopped = False + if stop_str: + if isinstance(stop_str, str): + pos = output.rfind(stop_str, rfind_start) + if pos != -1: + output = output[:pos] + stopped = True + else: + partially_stopped = is_partial_stop(output, stop_str) + elif isinstance(stop_str, Iterable): + for each_stop in stop_str: + pos = output.rfind(each_stop, rfind_start) + if pos != -1: + output = output[:pos] + stopped = True + break + else: + partially_stopped = is_partial_stop(output, each_stop) + if partially_stopped: + break + else: + raise ValueError("Invalid stop field type.") + + # Prevent yielding partial stop sequence + if not partially_stopped: + yield { + "text": output, + "logprobs": ret_logprobs, + "usage": { + "prompt_tokens": input_echo_len, + "completion_tokens": i, + "total_tokens": input_echo_len + i, + }, + "finish_reason": None, + } + + if stopped: + break + + # Finish stream event, which contains finish reason + else: + finish_reason = "length" + + if stopped: + finish_reason = "stop" + + yield { + "text": output, + "logprobs": ret_logprobs, + "usage": { + "prompt_tokens": input_echo_len, + "completion_tokens": i, + "total_tokens": input_echo_len + i, + }, + "finish_reason": finish_reason, + } + + # Clean + del past_key_values, out + gc.collect() + torch.cuda.empty_cache() + if device == "xpu": + torch.xpu.empty_cache() + if device == "npu": + torch.npu.empty_cache() diff --git a/pilot/utils/benchmarks/llm/llm_benchmarks.py b/pilot/utils/benchmarks/llm/llm_benchmarks.py new file mode 100644 index 000000000..b70742670 --- /dev/null +++ b/pilot/utils/benchmarks/llm/llm_benchmarks.py @@ -0,0 +1,243 @@ +from typing import Dict, List +import asyncio +import os +import sys +import time +import csv +import argparse +import logging +import traceback +from pilot.configs.model_config import ROOT_PATH, LLM_MODEL_CONFIG + +from pilot.model.cluster.worker.manager import ( + run_worker_manager, + initialize_worker_manager_in_client, + worker_manager, + WorkerManager, +) + +from pilot.model.base import ModelOutput, ModelInferenceMetrics +from pilot.model.cluster import PromptRequest +from pilot.scene.base_message import ModelMessage, ModelMessageRoleType + + +model_name = "vicuna-7b-v1.5" +model_path = LLM_MODEL_CONFIG[model_name] +# or vllm +model_type = "huggingface" + +controller_addr = "http://127.0.0.1:5000" + +result_csv_file = None + +parallel_nums = [1, 2, 4, 16, 32] +# parallel_nums = [1, 2, 4] + + +def get_result_csv_file() -> str: + return os.path.join( + ROOT_PATH, f"pilot/data/{model_name}_{model_type}_benchmarks_llm.csv" + ) + + +input_lens = [64, 64] +output_lens = [256, 512] + + +prompt_file_map = { + "11k": os.path.join( + ROOT_PATH, "docker/examples/benchmarks/benchmarks_llm_11k_prompt.txt" + ) +} + +METRICS_HEADERS = [ + # Params + "model_name", + "parallel_nums", + "input_length", + "output_length", + # Merge parallel result + "test_time_cost_ms", + "test_total_tokens", + "test_speed_per_second", # (tokens / s) + # Detail for each task + "start_time_ms", + "end_time_ms", + "current_time_ms", + "first_token_time_ms", + "first_completion_time_ms", + "first_completion_tokens", + "prompt_tokens", + "completion_tokens", + "total_tokens", + "speed_per_second", +] + + +def read_prompt_from_file(file_key: str) -> str: + full_path = prompt_file_map[file_key] + with open(full_path, "r+", encoding="utf-8") as f: + return f.read() + + +def build_param( + input_len: int, + output_len: int, + user_input: str, + system_prompt: str = None, +) -> Dict: + hist = [] + if system_prompt is not None: + hist.append( + ModelMessage(role=ModelMessageRoleType.SYSTEM, content=system_prompt) + ) + hist.append(ModelMessage(role=ModelMessageRoleType.HUMAN, content=user_input)) + hist = list(h.dict() for h in hist) + context_len = input_len + output_len + 2 + params = { + "prompt": user_input, + "messages": hist, + "model": model_name, + "echo": False, + "max_new_tokens": output_len, + "context_len": context_len, + } + return params + + +async def run_batch( + wh, input_len: int, output_len: int, parallel_num: int, output_file: str +): + tasks = [] + prompt = read_prompt_from_file("11k") + if model_type == "vllm": + max_input_str_len = input_len + if "baichuan" in model_name: + # TODO prompt handle first + max_input_str_len *= 2 + prompt = prompt[-max_input_str_len:] + + for _ in range(parallel_num): + params = build_param(input_len, output_len, prompt, system_prompt="") + tasks.append(wh.generate(params)) + print( + f"Begin run benchmarks, model name: {model_name}, input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, save result to {output_file}" + ) + start_time_ms = time.time_ns() // 1_000_000 + results: List[ModelOutput] = await asyncio.gather(*tasks) + end_time_ms = time.time_ns() // 1_000_000 + + test_time_cost_ms = end_time_ms - start_time_ms + test_total_tokens = 0 + rows = [] + for r in results: + metrics = r.metrics + if isinstance(metrics, dict): + metrics = ModelInferenceMetrics(**metrics) + print(r) + test_total_tokens += metrics.total_tokens + row_data = metrics.to_dict() + rows.append(row_data) + test_speed_per_second = test_total_tokens / (test_time_cost_ms / 1000.0) + + with open(output_file, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=METRICS_HEADERS) + if f.tell() == 0: + # Fist time + writer.writeheader() + for row in rows: + row["model_name"] = model_name + row["parallel_nums"] = parallel_num + row["input_length"] = input_len + row["output_length"] = output_len + row["test_time_cost_ms"] = test_time_cost_ms + row["test_total_tokens"] = test_total_tokens + row["test_speed_per_second"] = test_speed_per_second + writer.writerow(row) + print( + f"input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, save result to {output_file}" + ) + + +async def run_model(wh: WorkerManager) -> None: + global result_csv_file + if not result_csv_file: + result_csv_file = get_result_csv_file() + if os.path.exists(result_csv_file): + os.rename(result_csv_file, f"{result_csv_file}.bak.csv") + for parallel_num in parallel_nums: + for input_len, output_len in zip(input_lens, output_lens): + try: + await run_batch( + wh, input_len, output_len, parallel_num, result_csv_file + ) + except Exception: + msg = traceback.format_exc() + logging.error( + f"Run benchmarks error, input_len: {input_len}, output_len: {output_len}, parallel_num: {parallel_num}, error message: {msg}" + ) + + sys.exit(0) + + +def startup_llm_env(): + from fastapi import FastAPI + + app = FastAPI() + initialize_worker_manager_in_client( + app=app, + model_name=model_name, + model_path=model_path, + run_locally=False, + controller_addr=controller_addr, + local_port=6000, + start_listener=run_model, + ) + + +def connect_to_remote_model(): + startup_llm_env() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--model_name", type=str, default=model_name) + parser.add_argument("--model_path", type=str, default=None) + parser.add_argument("--model_type", type=str, default="huggingface") + parser.add_argument("--result_csv_file", type=str, default=None) + parser.add_argument("--input_lens", type=str, default="8,8,256,1024") + parser.add_argument("--output_lens", type=str, default="256,512,1024,1024") + parser.add_argument("--parallel_nums", type=str, default="1,2,4,16,32") + parser.add_argument( + "--remote_model", type=bool, default=False, help="Connect to remote model" + ) + parser.add_argument("--controller_addr", type=str, default="http://127.0.0.1:8000") + parser.add_argument("--limit_model_concurrency", type=int, default=200) + + args = parser.parse_args() + print(f"args: {args}") + model_name = args.model_name + model_path = args.model_path or LLM_MODEL_CONFIG[model_name] + result_csv_file = args.result_csv_file + input_lens = [int(i) for i in args.input_lens.strip().split(",")] + output_lens = [int(i) for i in args.output_lens.strip().split(",")] + parallel_nums = [int(i) for i in args.parallel_nums.strip().split(",")] + remote_model = args.remote_model + controller_addr = args.controller_addr + limit_model_concurrency = args.limit_model_concurrency + model_type = args.model_type + if len(input_lens) != len(output_lens): + raise ValueError("input_lens size must equal output_lens size") + + if remote_model: + # Connect to remote model and run benchmarks + connect_to_remote_model() + else: + # Start worker manager and run benchmarks + run_worker_manager( + model_name=model_name, + model_path=model_path, + start_listener=run_model, + limit_model_concurrency=limit_model_concurrency, + model_type=model_type, + ) diff --git a/scripts/run_llm_benchmarks.sh b/scripts/run_llm_benchmarks.sh new file mode 100755 index 000000000..ffa9ac6da --- /dev/null +++ b/scripts/run_llm_benchmarks.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +default_input_lens="64,64,64,512,1024,1024,2048" +default_output_lens="256,512,1024,1024,1024,2048,2048" +default_parallel_nums="1,2,4,16,32" + +input_lens=${1:-$default_input_lens} +output_lens=${2:-$default_output_lens} +parallel_nums=${3:-$default_parallel_nums} + +run_benchmark() { + local model_name=$1 + local model_type=$2 + DB_GPT_MODEL_BENCHMARK=true python pilot/utils/benchmarks/llm/llm_benchmarks.py --model_name ${model_name} --model_type ${model_type} --input_lens ${input_lens} --output_lens ${output_lens} --parallel_nums ${parallel_nums} +} + +run_benchmark "vicuna-7b-v1.5" "huggingface" +run_benchmark "vicuna-7b-v1.5" "vllm" +run_benchmark "baichuan2-7b" "huggingface" +run_benchmark "baichuan2-7b" "vllm"