1
0

WeworkFileUpload.php 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. <?php
  2. namespace app\jobs;
  3. use app\model\WeworksingleChatFile;
  4. use app\model\WeworksingleChatRecord;
  5. use OSS\Core\OssException;
  6. use OSS\OssClient;
  7. use think\facade\Log;
  8. use think\queue\Job;
  9. use think\facade\Queue;
  10. class WeworkFileUpload
  11. {
  12. public function fire(Job $job, $queuedata)
  13. {
  14. try {
  15. if ($job->attempts() > 2) {
  16. $info = WeworksingleChatFile::where('is_local', '=', 1)->order('id asc')->findOrEmpty();
  17. if (!$info->isEmpty()) {
  18. $info->is_local = 2;
  19. $info->save();
  20. }
  21. $job->delete();
  22. }
  23. $is_done = $this->jobDone($queuedata);
  24. if($is_done){
  25. $job->delete();
  26. }
  27. } catch (OssException $e) {
  28. //报错直接结束
  29. $job->delete();
  30. }
  31. }
  32. public function failed($queuedata)
  33. {
  34. // ...任务达到最大重试次数后,失败了
  35. }
  36. //上传
  37. private function jobDone($data){
  38. // 单次搜索1条未上传的文件, 防止单个文件过大导致队列超时
  39. $new_queue = false;
  40. $list = WeworksingleChatFile::where('is_local', '=', 1)->order('id asc')->select()->toArray();
  41. if (count($list) > 1) {
  42. $new_queue = true;
  43. $do_list = array_slice($list, 0, 1);
  44. } else {
  45. $do_list = $list;
  46. }
  47. foreach ($do_list as $k => $v) {
  48. $path = public_path() . $v['path'];
  49. // 文件不存在直接结束
  50. if (!file_exists($path)) {
  51. continue;
  52. }
  53. $chat_record = WeworksingleChatRecord::where('msgid', '=', $v['msgid'])->findOrEmpty();
  54. $company_id = $chat_record['company_id'] ?? 0;
  55. if ($company_id) {
  56. $osspath = 'chat_file/' . $company_id . '/' . date('Ymd') . '/' . $v['filename'];
  57. } else {
  58. $osspath = 'chat_file/nocompany/' . date('Ymd') . '/' . $v['filename'];
  59. }
  60. $res = $this->ossUpload($osspath, $path);
  61. if ($res) {
  62. $save= ['is_local'=> 0, 'oss_path'=> $osspath];
  63. WeworksingleChatFile::where('id', '=', $v['id'])->save($save);
  64. //@unlink($path);//删除本地文件
  65. //$local_path = public_path() . 'weworksingle_file';
  66. //@rmdir($local_path);//删除空文件夹
  67. }
  68. // amr 格式文件也上传
  69. if ($chat_record['msgtype'] == 'voice') {
  70. $path = public_path() . $v['other_path'];
  71. if ($company_id) {
  72. $osspath = 'chat_file/' . $company_id . '/' . date('Ymd') . '/' . $v['other_file'];
  73. } else {
  74. $osspath = 'chat_file/nocompany/' . date('Ymd') . '/' . $v['other_file'];
  75. }
  76. $res = $this->ossUpload($osspath, $path);
  77. if ($res) {
  78. $save= ['oss_other_path'=> $osspath];
  79. WeworksingleChatFile::where('id', '=', $v['id'])->save($save);
  80. //@unlink($path);//删除本地文件
  81. //$local_path = public_path() . 'weworksingle_file';
  82. //@rmdir($local_path);//删除空文件夹
  83. }
  84. }
  85. }
  86. if ($new_queue == true) {
  87. $jobHandlerClassName = 'app\jobs\WeworkFileUpload';
  88. $jobQueueName = 'upload_wework_file';
  89. $orderData = []; //这个是需要传到消费者的数据
  90. Queue::later(0, $jobHandlerClassName, $orderData, $jobQueueName);
  91. }
  92. return true;
  93. }
  94. /*
  95. * oss文件上传
  96. */
  97. private function ossUpload($path, $file)
  98. {
  99. $accessKeyId = config('app.wework_ali_oss_access_key_id');
  100. $accessKeySecret = config('app.wework_ali_oss_access_key_secret');
  101. $endpoint = config('app.wework_ali_oss_end_point');
  102. $bucket = config('app.wework_ali_oss_bucket');
  103. $oss = new OssClient($accessKeyId, $accessKeySecret, $endpoint);
  104. try {
  105. $oss->uploadFile($bucket, $path, $file);
  106. } catch (OssException $e) {
  107. return false;
  108. }
  109. return true;
  110. }
  111. /*
  112. * oss检测文件
  113. */
  114. private function ossCheck($file)
  115. {
  116. $accessKeyId = config('app.wework_ali_oss_access_key_id');
  117. $accessKeySecret = config('app.wework_ali_oss_access_key_secret');
  118. $endpoint = config('app.wework_ali_oss_end_point');
  119. $bucket = config('app.wework_ali_oss_bucket');
  120. $oss = new OssClient($accessKeyId, $accessKeySecret, $endpoint);
  121. try {
  122. $exist = $oss->doesObjectExist($bucket, $file);
  123. return $exist;
  124. } catch (OssException $e) {
  125. return false;
  126. }
  127. }
  128. }