workerman源码-workerman启动流程

前面我们跟着代码看了一遍workerman的初始化流程.但对于如何监听端口.等操作还没有具体的实现.我们这次就来看一下.workerman是如何监听端口并运行的.

runAll

在前面我们初始化方法过后,就开始执行runAll方法.在runAll方法中,有如下几个方法.我们会一个一个方法介绍.注意.在这里,所有的都是以static来运行的.所以,就是Worker类本身.

// 初始化环境变量.是否为linux.  
static::checkSapiEnv();  
// 执行初始化流程  
static::init();  
// 锁定命令  
static::lock();  
// 解析命令行  
static::parseCommand();  
// 是否后台运行  
static::daemonize();  
// 初始化worker  
static::initWorkers();  
// 安装信号处理  
static::installSignal();  
// 保存主进程ID  
static::saveMasterPid();  
// 解锁  
static::unlock();  
// 展示ui  
static::displayUI();  
// fork子进程  
static::forkWorkers();  
// 重定义输入输出  
static::resetStd();  
// 监控子进程.  
static::monitorWorkers();  

checkSapiEnv

检查当前命令行的环境.至于为什么会有这个函数,主要是由于window下.只能开启一个进程.所以在很多地方都需要对于windows来进行特殊的处理.我们来看看代码.

// 如果不是cli运行,就直接提示.  
if (php_sapi_name() != "cli") {  
   exit("only run in command line mode n");
}  
// 如果根据文件分隔符来判断,windows下为,linux为/.这是一种常用的方式.  
if (DIRECTORY_SEPARATOR === '') {  
   self::$_OS = OS_TYPE_WINDOWS;
}  

init

这里主要是对错误处理,pid文件,和日志文件的初始化.还有初始化计时器

set_error_handler(function($code, $msg, $file, $line){  
 Worker::safeEcho("$msg in file $file on line $linen");
});    
// 这里指定了启动文件.启动文件将用于后续的启动信息.  
$backtrace        = debug_backtrace();  
// startFile格式为全路径.例如在/var/www/study/https.php,那么startFile就为/var/www/study/https.php.  
static::$_startFile = $backtrace[count($backtrace) - 1]['file'];  
// 根据启动生成唯一的前缀. 根据startFile的全路径生成唯一的前缀.  
$unique_prefix = str_replace('/', '_', static::$_startFile);  
  
// 根据前缀生成pidFile.存放于vendor/workerman下面.  
if (empty(static::$pidFile)) {  
 static::$pidFile = __DIR__ . "/../$unique_prefix.pid";
}  
  
// 生成日志文件.  
if (empty(static::$logFile)) {  
 static::$logFile = __DIR__ . '/../workerman.log';
}  
  
$log_file = (string)static::$logFile;  
// 如果没有日志文件,就初始话日志文件.  
if (!is_file($log_file)) {  
    touch($log_file); 
    chmod($log_file, 0622);
}  
  
// 初始化状态为启动中...  
static::$_status = static::STATUS_STARTING;  
  
// 设置启动时间和状态文件  
static::$_globalStatistics['start_timestamp'] = time();  
static::$_statisticsFile                      = sys_get_temp_dir() . "/$unique_prefix.status";  
  
// 设置进程标题  
static::setProcessTitle('WorkerMan: master process  start_file=' . static::$_startFile);  
  
// 初始化worker的idmap.  
static::initId();  
  
// 初始化计时器.  
Timer::init();

这里面其中最主要的是初始化worker的idmap,用于记录worker的对应关系. 这里主要存储子worker的pid信息.我们来看下函数.

protected static function initId()  
{  
 // 循环所有的worker,并将所有的进程信息初始化赋值.  
 foreach (static::$_workers as $worker_id => $worker){ 
    $new_id_map = array(); 
    $worker->count = $worker->count <= 0 ? 1 : $worker->count; 
    for($key = 0; $key < $worker->count; $key++) {  
        $new_id_map[$key] = isset(static::$_idMap[$worker_id][$key]) ? static::$_idMap[$worker_id][$key] : 0; 
    } 
    static::$_idMap[$worker_id] = $new_id_map; 
 }
}  

初始化完成会形成,worker_id => 子进程信息.例如

array(1) {  
 ["000000001364d94e00000000462a38e7"]=> array(4) { 
 [0]=> int(0) 
 [1]=> int(0) 
 [2]=> int(0) 
 [3]=> int(0) 
 }
}  

这样的结构便于将id对应到对应的信息中.

lock

protected static function lock()  
{  
 // 打开锁定的启动文件.  
 $fd = fopen(static::$_startFile, 'r'); // 如果文件无法打卡.则默认认为workerman已经启动了.  
 if (!$fd || !flock($fd, LOCK_EX)) {            
    static::log("Workerman[".static::$_startFile."] already running"); exit; 
 }
}  

为何需要锁定启动文件.这里会引发一个问题,如果同一时间启动worker.这样会导致无法预知的错误.所以在这里就先锁定启动文件.就保存的原子性操作.

parseCommand

完成了文件锁定后,就开始解析我们的命令了.由于此次信息较多.会单独写一个文章来解读.

daemonize

daemonize是否运行在后台,如果我们在命令行中使用了-d参数,就会执行此函数.但是我们执行的命令行没有加上-d.所以,就不会执行此函数.我们可以来看一下.

protected static function daemonize()  
{  
 // 如果不是后台运行,如果不是linux.就直接返回.  
 if (!static::$daemonize || static::$_OS !== OS_TYPE_LINUX) { 
    return; 
 } 
 // 设定umask.  
 umask(0); // 开始分叉,并返回pid.如果分叉失败,pid就为-1.如果成功pid就为0.  
 $pid = pcntl_fork(); 
 if (-1 === $pid) { 
    throw new Exception('fork fail'); 
 } elseif ($pid > 0) { exit(0); } 
 // 将当前会话设置为进程组长,并管理其他子进程. 设置父进程先与子进程退出,子进程则会被1号进程收养,这个子进程就会成为init的子进程.  
 if (-1 === posix_setsid()) { 
    throw new Exception("setsid fail"); 
 } 
 // 再次分叉进程.以避免SVR4系统重新获得终端的控制.  
 $pid = pcntl_fork(); 
 if (-1 === $pid) { 
    throw new Exception("fork fail"); } elseif (0 !== $pid) { exit(0); 
 }
}  

initWorkers

执行完后台运行过后,就开始初始化worker了.

protected static function initWorkers()  
{  
 // 如果系统不是linux就直接返回.  
 if (static::$_OS !== OS_TYPE_LINUX) { 
     return; 
 } 
 // 开始初始化worker  
 foreach (static::$_workers as $worker) { 
     // 设置worker name.  
     if (empty($worker->name)) { 
        $worker->name = 'none'; 
     }  
     // 设置worker的user  
     if (empty($worker->user)) { 
        $worker->user = static::getCurrentUser(); 
     } else { 
         if (posix_getuid() !== 0 && $worker->user != static::getCurrentUser()) { 
             static::log('Warning: You must have the root privileges to change uid and gid.'); 
         } 
     }  
     // 设置socket名称.  
     $worker->socket = $worker->getSocketName();  
     // 设置状态.  
     $worker->status = '<g> [OK] </g>';  
     // 设置ui.  
     foreach(static::getUiColumns() as $column_name => $prop){ 
        !isset($worker->{$prop}) && $worker->{$prop}= 'NNNN'; 
        $prop_length = strlen($worker->{$prop}); 
        $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength'; 
        static::$$key = max(static::$$key, $prop_length); 
     }  
     // 监听.  
     if (!$worker->reusePort) { $worker->listen(); } }}  

installSignal

初始化完worker过后,就监听信号集了.在这里监听的信号集.信号集只能在linux中使用.

protected static function installSignal()  
{  
 if (static::$_OS !== OS_TYPE_LINUX) { 
    return; 
 } 
 // 进程关闭,Ctrl+c时触发  
 pcntl_signal(SIGINT, array('WorkermanWorker', 'signalHandler'), false); 
 // 进程强制关闭,例如使用kill.等命令关闭进程时触发.  
 pcntl_signal(SIGTERM, array('WorkermanWorker', 'signalHandler'), false); 
 // 进程重启  
 pcntl_signal(SIGUSR1, array('WorkermanWorker', 'signalHandler'), false); 
 // 进程退出  
 pcntl_signal(SIGQUIT, array('WorkermanWorker', 'signalHandler'), false); 
 // 查看进程的状态  
 pcntl_signal(SIGUSR2, array('WorkermanWorker', 'signalHandler'), false); 
 // 查看链接状态  
 pcntl_signal(SIGIO, array('WorkermanWorker', 'signalHandler'), false); 
 // 屏蔽SIGPIPE消息  
 pcntl_signal(SIGPIPE, SIG_IGN, false);
}  

saveMasterPid

将主进程的ID保存到pidFile文件中.

protected static function saveMasterPid()  
{  
 if (static::$_OS !== OS_TYPE_LINUX) { 
    return; 
 }  
 static::$_masterPid = posix_getpid(); // 将主进程ID保存到pidFile中,pidFile在init时已经指定为vendor/workerman/下面的pid文件中去了.  
 if (false === file_put_contents(static::$pidFile, static::$_masterPid)) { 
    throw new Exception('can not save pid to ' . static::$pidFile); 
 }
}  

unlock

当以上都执行完成过后就解除我们启动文件的锁定状态.这个函数主要就是进行解锁操作.

protected static function unlock()  
{  
     $fd = fopen(static::$_startFile, 'r'); $fd && flock($fd, LOCK_UN);
}  

displayUI

显示ui.workerman在处理的时候,是先展示的ui.就是打印出类似于以下情况的信息

Workerman[http.php] start in DEBUG mode  
----------------------------------------- WORKERMAN -----------------------------------------  
Workerman version:3.5.22          PHP version:7.1.23-4+ubuntu18.04.1+deb.sury.org+1  
------------------------------------------ WORKERS ------------------------------------------  
proto   user            worker          listen                 processes    status 
tcp     adolph          none            http://0.0.0.0:2345    4             [OK] 
---------------------------------------------------------------------------------------------  
Press Ctrl+C to stop. Start success.  

所以,信息打印出来过后也不一定会正确的监听事件.下面我们就继续查看.

forkWorkers

以上的准备工作完成,就开始fork我们的子进程了.并设置子进程的监听.

rotected static function forkWorkers()  
{  
    if (static::$_OS === OS_TYPE_LINUX) { 
        // 如果是linux就按照fork  
        static::forkWorkersForLinux(); 
    } else { 
        // windows下监听  
        static::forkWorkersForWindows(); 
    }
}  

根据平台来处理对应的子进程.我们是在linux中,所以我们就看一下static::forkWorkersForLinux这个函数.

protected static function forkWorkersForLinux()  
{  
    // 根据所有的worker,我们的状态信息在init中存放信息.  
    foreach (static::$_workers as $worker) { 
        if (static::$_status === static::STATUS_STARTING) { 
            if (empty($worker->name)) { 
                $worker->name = $worker->getSocketName(); 
            } 
            $worker_name_length = strlen($worker->name); 
            if (static::$_maxWorkerNameLength < $worker_name_length) { 
                static::$_maxWorkerNameLength = $worker_name_length; 
            } 
        } 
        // 开始fork.从$_pidMap来获取信息并判断.这里的$_pidMap则是由workerId => 子进程的进程号组成的数组.  
        // 而_pidMap则是在new Worker的时候将Worker直接存储进去的.所以在这里我们就解决了在初始化流程的时候疑问了.  
        while (count(static::$_pidMap[$worker->workerId]) < $worker->count) { 
        // 开始fork  
            static::forkOneWorkerForLinux($worker); 
        } 
    }
}  

static::forOneWorkerForLinux函数主要是来处理worker信息.这里我们来看一下.

protected static function forkOneWorkerForLinux($worker)  
{  
    // 开始获取id信息. 从idMap中获取   
    // array_search($pid, static::$_idMap[$worker_id]);  
    // 由于我们的idMap是由workerId=>子进程号组成的数组.所以,第一次查出来的id指定为0.  
    $id = static::getId($worker->workerId, 0); 
    if ($id === false) { 
        return; 
    } 
    // 开始fork
    $pid = pcntl_fork(); 
    // 如果pid大于0. 则是主进程.  
    if ($pid > 0) { 
        // 存到pidMap中,  
        static::$_pidMap[$worker->workerId][$pid] = $pid; 
        // 将idMap中的id换为进程号.这里的进程号就包含了主进程的信息.  
        static::$_idMap[$worker->workerId][$id]   = $pid; 
    } elseif (0 === $pid) {  
        // 如果是子进程.  
        srand(); 
        mt_srand(); 
        // 开始监听事件.  
        if ($worker->reusePort) { 
            $worker->listen(); 
        } 
        // 如果状态为STATUS_STARTING,就重定向输入输出.  
        if (static::$_status === static::STATUS_STARTING) { 
            static::resetStd(); 
        } 
        // 重新初始化pid. 所以,在子进程中.子进程的pidMap为空.  
        static::$_pidMap  = array(); 
        // 开始处理解除掉其他worker的监听.保证当前子进程中,只有一个worker来监听事件.  
        foreach(static::$_workers as $key => $one_worker) { 
            if ($one_worker->workerId !== $worker->workerId) { 
            $one_worker->unlisten(); unset(static::$_workers[$key]); 
            } 
        } 
        // 删除所有的定时器.  
        Timer::delAll();
        static::setProcessTitle('WorkerMan: worker process  ' . $worker->name . ' ' . $worker->getSocketName());    
        // 设置用户和组.  
        $worker->setUserAndGroup(); 
        $worker->id = $id; 
        // 执行run方法.  
        $worker->run(); 
        $err = new Exception('event-loop exited'); 
        static::log($err); 
        exit(250); 
    } else { 
        throw new Exception("forkOneWorker fail"); 
    }
}  

上面代码如果在linux中运行,就已经分为主进程和子进程来进行处理了.在主进程中包含所有对子进程信息的保存.而子进程则在这里监听方法.需要注意一点的是,子进程中对所有父进程的变量是共享的.我们来看看子进程是如何监听事件的.$worker->run();

public function run()  
{  
    //更新状态为运行中.  
    static::$_status = static::STATUS_RUNNING;  
    // 注册关闭事件.  
    register_shutdown_function(array("WorkermanWorker", 'checkErrors'));  
    // 设置root的路径  
    Autoloader::setRootPath($this->_autoloadRootPath);  
    // 创建事件监听  
    if (!static::$globalEvent) { 
        $event_loop_class = static::getEventLoopName(); 
        static::$globalEvent = new $event_loop_class;           $this->resumeAccept(); 
    }  
    // 重新安装信号集.  
    static::reinstallSignal();  
    // 初始化计时器  
    Timer::init(static::$globalEvent);  
    // 设置消息时间.  
    if (empty($this->onMessage)) { 
        $this->onMessage = function () {}; 
    } 
    // 还原错误处理函数.  
    restore_error_handler(); 
    // 触发workerStart事件.  
    if ($this->onWorkerStart) { 
        try { 
            call_user_func($this->onWorkerStart, $this); 
        } catch (Exception $e) { 
            static::log($e); 
            // Avoid rapid infinite loop exit.
            sleep(1); 
            exit(250); 
        } catch (Error $e) { 
            static::log($e); 
            // Avoid rapid infinite loop exit. 
            sleep(1); 
            exit(250); 
        } 
    }  
    // 等待事件触发.  
    static::$globalEvent->loop();
}  

run函数开始触发workerStart事件,并等待worker事件触发.所以这里的globalEvent就是我们的事件处理函数.以上都是在子进程中完成的处理.子进程就会在这里停止.

resetStd

当我们fork完所有的子进程过后,就执行我们的重定向输入输出.

public static function resetStd()  
{  
    // 如果不是守护进程.  
    if (!static::$daemonize || static::$_OS !== OS_TYPE_LINUX) { 
        return; 
    }  
    global $STDOUT, $STDERR; 
    // 打开输出的文件.  
    $handle = fopen(static::$stdoutFile, "a"); 
    if ($handle) { 
        unset($handle); 
        set_error_handler(function(){});    
        fclose($STDOUT); 
        fclose($STDERR); 
        fclose(STDOUT); 
        fclose(STDERR); 
        $STDOUT = fopen(static::$stdoutFile, "a"); 
        $STDERR = fopen(static::$stdoutFile, "a"); 
        // 重新定向到自己定义输出里面.  
        static::outputStream($STDOUT);      
        restoe_error_handler(); 
    } else { 
        throw new Exception('can not open stdoutFile ' . static::$stdoutFile); 
    }
}  

monitorWorkers()

最后,就是监听我们的子进程信息.这里由于我们的篇幅过长,就另起一张写入.

未完成

workerman对事件的监听  
workerman对子进程的监控  
Image placeholder
adolph
未设置
  26人点赞

没有讨论,发表一下自己的看法吧

推荐文章
使用 Workerman 做一个聊天室

为什么要写这篇文章? 我学习Workerman好几次了,每次都失败(没做成想要的功能,原谅我比较笨)。但是这次也花了好几个小时,把之前没做成的功能实现了。其实就是两个简单的功能:一对一发送消息,广

深入探究 RocketMQ 事务机制的实现流程,为什么它能做到发送消息零丢失?

1、解决消息丢失的第一个问题:订单系统推送消息领丢失既然我们已经明确了消息在基于MQ传输的过程中可能丢失的几个地方,那么我们接着就得一步一步考虑如何去解决各个环节丢失消息的问题,首先要解决的第一个问题

简介PWA与Service Worker

1.关于PWAPWA,全称Progressivewebapps,即渐进式Web应用。PWA技术主要作用为构建跨平台的Web应用程序,并使其具有与原生应用程序相同的用户体验。早在2014年,W3C就已经

基于内存和文件存储的 queue worker, 不用 Redis 适合单进程使用没有外部依赖

因为最近要做一个简单的并发任务系统,在github上面找了一圈并没有简单可依赖的库,所以自己写了一个。欢迎大家Review贡献代码。项目地址https://github.com/iflamed/mfw

聊聊chronos的DeleteBgWorker

序本文主要研究一下chronos的DeleteBgWorkerDeleteBgWorkerDDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chron

玩转 GitHub Actions,简化 npm 发布流程

Github最近添加了一项名为GithubActions的新功能,为我们带来了一套强大的工作流系统,可以处理各种各样的任务。我们在发布Node.js包时可以使用Actions自动运行测试,然后自动将

02.3. 流程和函数

这小节我们要介绍Go里面的流程控制以及函数操作。 流程控制 流程控制在编程语言中是最伟大的发明了,因为有了它,你可以通过很简单的流程描述来表达很复杂的逻辑。Go中流程控制分三大类:条件判断,循环控制和

Python入门教程_4. 深入 Python 流程控制

除了刚刚介绍的while语句,Python还有一些在其他语言中常见的控制流语句,并做了一些改动。 4.1.if语句 也许最著名的语句是if语句了。 例如: >>>x=int(input("Please

从0到1,马蜂窝大交通团队如何构建高效研发流程体系?

“旅游之前,先上马蜂窝”已经成为许多人习惯性的选择。2019年5月,马蜂窝完成了新一轮融资,金额达2.5亿美元。这也标志着通过集内容、社区、交易为一体的消费决策场景构建,从攻略社区起家的马蜂窝开始迈入

老焦专栏 | 用 RACI 模式梳理业务流程,提高业务发布的效率

转载本文需注明出处:微信公众号EAWorld,违者必究。最近经常在不同场合说,技术发展已经进入深水区。IT技术发展已经越来越成熟了,尤其在金融行业,以前是解决从无到有的问题,现在该有的系统都有了,是解

为什么说IPA智能流程自动化是企业IT的下一波浪潮?

提到IPA,可能很多人会立刻想到RPA。RPA,即机器人流程自动化,是企业IT过去两年最热门的技术之一。仅在2018年,就有三家公司拿到了总额超过十亿美金的风投,包括AnywhereAutomatio

“分库分表” ?选型和流程要慎重,否则会失控

数据库中间件之分库分表恭喜你,贵公司终于成长到一定规模,需要考虑高可用,甚至分库分表了。但你是否知道分库分表需要哪些要素?拆分过程是复杂的,提前计划,不要等真正开工,各种意外的工作接踵而至,以至失控。

流程控制 if

流程控制:对计算机执行代码的管控。分类:顺序、分支、循环循序结构是系统默认自发从上而下执行。分支结构:单项分支(if...)当条件表达式为真,则执行代码组中的内容,为假则跳过。双向分支(if...el

流程控制 for、while

循环结构while循环执行结束后会回到while的位置,如果没有设定结束条件,会一直循环到资源耗尽。【死循环】为while循环设定一个结束条件使之变为带有计数格式的循环,当条件达到跳出循环。 for循

其他流程控制语句

break用于终止循环跳出当前大循环continue跳过本次循环开启下一次循环pass保证代码语法不出错

【Git入门234】Git本地仓库常用操作流程2_分支相关操作_大总结

创建分支gitbranchgitbranchx 仅创建一个叫x的分支拷贝,不对代码进行任何变动 可以创造平行时间线x 术语叫“分支” 虚拷贝,引用 可以同时基于master/branchx开发

0103-springmvc的基本流程

背景现在的it研发,已经从管理系统时代迈入了互联网系统时代。页面开发已经从基于JSP+struts转变为为前后端分离的方式(springMVC+JS);思想MVCmvc框架不仅适用于java的开发,也

【Git入门5】Git远程仓库 - GitHub常用操作流程1_配置

=================================== 本地仓库和远程仓库是完全隔开的 代码存储在云端GitHub SSHkey验证身份使用场景:可以避免我们重复的输入密码,提高开发效

SpringMVC加载流程

 这节介绍SpringMVC,SpringMVC是一种基于Java的实现MVC设计模式的请求驱动类型的轻量级Web框架。本章会介绍相关概念,流程,再从源码进行讲解。1.MVC MVC(ModelVie

敏捷开发流程之Scrum:3个角色、5个会议、12原则

本文主要从Scrum的定义和目的、敏捷宣言、Scrum中的人员角色、Scrum开发流程、敏捷的12原则等几方面帮助大家理解Scrum敏捷开发的全过程。一、Scrum的定义和目的Scrum是一个用于开发

Java语言的流程控制笔记

当for循环里面只有分号的时候是无限循环

JavaScript的运算符和流程控制笔记

"usestrict""usestrict""usestrict"严格模式

REST framework JWT Auth

JWT认证的REST框架 概述 这个包提供对DjangoRESTframework的JSONWebToken认证支持。 需要满足条件 Python(2.7,3.3,3.4,3.5) Django(1.

【翻译】REST framework JWT Auth

JWT认证的REST框架原文链接概述这个包提供对DjangoRESTframework的JSONWebToken认证支持。需要满足条件Python(2.7,3.3,3.4,3.5) Django(1.