ChatService.php 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | CRMEB [ CRMEB赋能开发者,助力企业发展 ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2016~2023 https://www.crmeb.com All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed CRMEB并不是自由软件,未经许可不能去掉CRMEB相关版权
  8. // +----------------------------------------------------------------------
  9. // | Author: CRMEB Team <admin@crmeb.com>
  10. // +----------------------------------------------------------------------
  11. namespace crmeb\services\workerman\chat;
  12. use app\services\kefu\service\StoreServiceRecordServices;
  13. use app\services\kefu\service\StoreServiceServices;
  14. use Channel\Client;
  15. use crmeb\services\workerman\ChannelService;
  16. use crmeb\services\workerman\Response;
  17. use Workerman\Connection\TcpConnection;
  18. use Workerman\Lib\Timer;
  19. use Workerman\Worker;
  20. class ChatService
  21. {
  22. /**
  23. * @var Worker
  24. */
  25. protected $worker;
  26. /**
  27. * @var TcpConnection[]
  28. */
  29. protected $connections = [];
  30. /**
  31. * @var TcpConnection[]
  32. */
  33. protected $user = [];
  34. /**
  35. * 在线客服
  36. * @var TcpConnection[]
  37. */
  38. protected $kefuUser = [];
  39. /**
  40. * @var ChatHandle
  41. */
  42. protected $handle;
  43. /**
  44. * @var Response
  45. */
  46. protected $response;
  47. /**
  48. * @var int
  49. */
  50. protected $timer;
  51. public function __construct(Worker $worker)
  52. {
  53. $this->worker = $worker;
  54. $this->handle = new ChatHandle($this);
  55. $this->response = new Response();
  56. }
  57. public function setUser(TcpConnection $connection)
  58. {
  59. $this->user[$connection->user->uid] = $connection;
  60. }
  61. /**
  62. * 获得当前在线客服
  63. * @return TcpConnection[]
  64. */
  65. public function kefuUser()
  66. {
  67. return $this->kefuUser;
  68. }
  69. /**
  70. * 设置当前在线客服
  71. * @param TcpConnection $connection
  72. */
  73. public function setKefuUser(TcpConnection $connection, bool $isUser = true)
  74. {
  75. $this->kefuUser[$connection->kefuUser->uid] = $connection;
  76. if ($isUser) {
  77. $this->user[$connection->user->uid] = $connection;
  78. }
  79. }
  80. public function user($key = null)
  81. {
  82. return $key ? ($this->user[$key] ?? false) : $this->user;
  83. }
  84. public function onConnect(TcpConnection $connection)
  85. {
  86. $this->connections[$connection->id] = $connection;
  87. $connection->lastMessageTime = time();
  88. }
  89. public function onMessage(TcpConnection $connection, $res)
  90. {
  91. $connection->lastMessageTime = time();
  92. $res = json_decode($res, true);
  93. if (!$res || !isset($res['type']) || !$res['type'] || $res['type'] == 'ping') {
  94. return $this->response->connection($connection)->success('ping', ['now' => time()]);
  95. }
  96. if (!method_exists($this->handle, $res['type'])) return;
  97. try {
  98. $this->handle->{$res['type']}($connection, $res + ['data' => []], $this->response->connection($connection));
  99. } catch (\Throwable $e) {
  100. }
  101. }
  102. public function onWorkerStart(Worker $worker)
  103. {
  104. ChannelService::connet();
  105. Client::on('crmeb_chat', function ($eventData) use ($worker) {
  106. if (!isset($eventData['type']) || !$eventData['type']) return;
  107. $ids = isset($eventData['ids']) && count($eventData['ids']) ? $eventData['ids'] : array_keys($this->user);
  108. $fun = $eventData['fun'] ?? false;
  109. foreach ($ids as $id) {
  110. if (isset($this->user[$id])) {
  111. if ($fun) {
  112. $this->handle->{$eventData['type']}($this->user[$id], $eventData + ['data' => []], $this->response->connection($this->user[$id]));
  113. } else {
  114. $this->response->connection($this->user[$id])->success($eventData['type'], $eventData['data'] ?? null);
  115. }
  116. }
  117. }
  118. });
  119. $this->timer = Timer::add(15, function () use (&$worker) {
  120. $time_now = time();
  121. foreach ($worker->connections as $connection) {
  122. if ($time_now - $connection->lastMessageTime > 12) {
  123. //定时器判断当前用户是否下线
  124. if (isset($connection->user->uid) && !isset($connection->user->isTourist)) {
  125. /** @var StoreServiceRecordServices $service */
  126. $service = app()->make(StoreServiceRecordServices::class);
  127. $service->updateRecord(['to_uid' => $connection->user->uid], ['online' => 0]);
  128. }
  129. $this->response->connection($connection)->close('timeout');
  130. //广播给客服谁下线了
  131. foreach ($this->kefuUser as $uid => &$conn) {
  132. if (isset($connection->user->uid) && $connection->user->uid != $uid) {
  133. if (isset($conn->onlineUids) && ($key = array_search($connection->user->uid, $conn->onlineUids)) !== false) {
  134. unset($conn->onlineUids[$key]);
  135. }
  136. $this->response->connection($conn)->send('user_online', ['to_uid' => $connection->user->uid, 'online' => 0]);
  137. }
  138. }
  139. }
  140. }
  141. });
  142. Timer::add(2, function () use (&$worker) {
  143. $uids = [];
  144. foreach ($this->user() as $uid => $connection) {
  145. if (!isset($connection->isTourist)) {
  146. $uids[] = $uid;
  147. }
  148. }
  149. if ($uids) {
  150. //除了当前在线的其他全部都下线
  151. /** @var StoreServiceRecordServices $service */
  152. $service = app()->make(StoreServiceRecordServices::class);
  153. $service->updateOnline(['notUid' => $uids], ['online' => 0]);
  154. }
  155. $kefuUid = array_keys($this->kefuUser());
  156. if ($kefuUid) {
  157. /** @var StoreServiceServices $kefuService */
  158. $kefuService = app()->make(StoreServiceServices::class);
  159. $kefuService->updateOnline(['notUid' => $kefuUid], ['online' => 0]);
  160. }
  161. });
  162. }
  163. public function onClose(TcpConnection $connection)
  164. {
  165. var_dump('close');
  166. unset($this->connections[$connection->id]);
  167. if (isset($connection->user->uid)) {
  168. unset($this->user[$connection->user->uid]);
  169. }
  170. if (isset($connection->kefuUser->uid)) {
  171. unset($this->kefuUser[$connection->kefuUser->uid]);
  172. }
  173. }
  174. }