WorkermanService.php 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | CRMEB [ CRMEB赋能开发者,助力企业发展 ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2016~2023 https://www.crmeb.com All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed CRMEB并不是自由软件,未经许可不能去掉CRMEB相关版权
  8. // +----------------------------------------------------------------------
  9. // | Author: CRMEB Team <admin@crmeb.com>
  10. // +----------------------------------------------------------------------
  11. namespace crmeb\services\workerman;
  12. use Channel\Client;
  13. use Workerman\Connection\TcpConnection;
  14. use Workerman\Lib\Timer;
  15. use Workerman\Worker;
  16. class WorkermanService
  17. {
  18. /**
  19. * @var Worker
  20. */
  21. protected $worker;
  22. /**
  23. * @var TcpConnection[]
  24. */
  25. protected $connections = [];
  26. /**
  27. * @var TcpConnection[]
  28. */
  29. protected $user = [];
  30. /**
  31. * @var WorkermanHandle
  32. */
  33. protected $handle;
  34. /**
  35. * @var Response
  36. */
  37. protected $response;
  38. /**
  39. * @var int
  40. */
  41. protected $timer;
  42. public function __construct(Worker $worker)
  43. {
  44. $this->worker = $worker;
  45. $this->handle = new WorkermanHandle($this);
  46. $this->response = new Response();
  47. }
  48. public function setUser(TcpConnection $connection)
  49. {
  50. $this->user[$connection->adminInfo['id']] = $connection;
  51. }
  52. public function onConnect(TcpConnection $connection)
  53. {
  54. $this->connections[$connection->id] = $connection;
  55. $connection->lastMessageTime = time();
  56. }
  57. public function onMessage(TcpConnection $connection, $res)
  58. {
  59. $connection->lastMessageTime = time();
  60. $res = json_decode($res, true);
  61. if (!$res || !isset($res['type']) || !$res['type'] || $res['type'] == 'ping') {
  62. return $this->response->connection($connection)->success('ping', ['now' => time()]);
  63. }
  64. var_dump('onMessage', $res);
  65. if (!method_exists($this->handle, $res['type'])) return;
  66. $this->handle->{$res['type']}($connection, $res + ['data' => []], $this->response->connection($connection));
  67. }
  68. public function onWorkerStart(Worker $worker)
  69. {
  70. var_dump('onWorkerStart');
  71. ChannelService::connet();
  72. Client::on('crmeb', function ($eventData) use ($worker) {
  73. if (!isset($eventData['type']) || !$eventData['type']) return;
  74. $ids = isset($eventData['ids']) && count($eventData['ids']) ? $eventData['ids'] : array_keys($this->user);
  75. foreach ($ids as $id) {
  76. if (isset($this->user[$id]))
  77. $this->response->connection($this->user[$id])->success($eventData['type'], $eventData['data'] ?? null);
  78. }
  79. });
  80. $this->timer = Timer::add(15, function () use (&$worker) {
  81. $time_now = time();
  82. foreach ($worker->connections as $connection) {
  83. if ($time_now - $connection->lastMessageTime > 12) {
  84. $this->response->connection($connection)->close('timeout');
  85. }
  86. }
  87. });
  88. }
  89. public function onClose(TcpConnection $connection)
  90. {
  91. var_dump('onClose');
  92. unset($this->connections[$connection->id]);
  93. }
  94. }