123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- <?php
- namespace app\jobs;
- use app\model\WeworksingleChatFile;
- use app\model\WeworksingleChatRecord;
- use OSS\Core\OssException;
- use OSS\OssClient;
- use think\facade\Log;
- use think\queue\Job;
- use think\facade\Queue;
- class WeworkFileUpload
- {
- public function fire(Job $job, $queuedata)
- {
- try {
- if ($job->attempts() > 2) {
- $info = WeworksingleChatFile::where('is_local', '=', 1)->order('id asc')->findOrEmpty();
- if (!$info->isEmpty()) {
- $info->is_local = 2;
- $info->save();
- }
- $job->delete();
- }
- $is_done = $this->jobDone($queuedata);
- if($is_done){
- $job->delete();
- }
- } catch (OssException $e) {
- //报错直接结束
- $job->delete();
- }
- }
- public function failed($queuedata)
- {
- // ...任务达到最大重试次数后,失败了
- }
- //上传
- private function jobDone($data){
- // 单次搜索1条未上传的文件, 防止单个文件过大导致队列超时
- $new_queue = false;
- $list = WeworksingleChatFile::where('is_local', '=', 1)->order('id asc')->select()->toArray();
- if (count($list) > 1) {
- $new_queue = true;
- $do_list = array_slice($list, 0, 1);
- } else {
- $do_list = $list;
- }
- foreach ($do_list as $k => $v) {
- $path = public_path() . $v['path'];
- // 文件不存在直接结束
- if (!file_exists($path)) {
- continue;
- }
-
- $chat_record = WeworksingleChatRecord::where('msgid', '=', $v['msgid'])->findOrEmpty();
- $company_id = $chat_record['company_id'] ?? 0;
- if ($company_id) {
- $osspath = 'chat_file/' . $company_id . '/' . date('Ymd') . '/' . $v['filename'];
- } else {
- $osspath = 'chat_file/nocompany/' . date('Ymd') . '/' . $v['filename'];
- }
- $res = $this->ossUpload($osspath, $path);
- if ($res) {
- $save= ['is_local'=> 0, 'oss_path'=> $osspath];
- WeworksingleChatFile::where('id', '=', $v['id'])->save($save);
- //@unlink($path);//删除本地文件
- //$local_path = public_path() . 'weworksingle_file';
- //@rmdir($local_path);//删除空文件夹
- }
- // amr 格式文件也上传
- if ($chat_record['msgtype'] == 'voice') {
- $path = public_path() . $v['other_path'];
- if ($company_id) {
- $osspath = 'chat_file/' . $company_id . '/' . date('Ymd') . '/' . $v['other_file'];
- } else {
- $osspath = 'chat_file/nocompany/' . date('Ymd') . '/' . $v['other_file'];
- }
- $res = $this->ossUpload($osspath, $path);
- if ($res) {
- $save= ['oss_other_path'=> $osspath];
- WeworksingleChatFile::where('id', '=', $v['id'])->save($save);
- //@unlink($path);//删除本地文件
- //$local_path = public_path() . 'weworksingle_file';
-
- //@rmdir($local_path);//删除空文件夹
- }
- }
- }
- if ($new_queue == true) {
- $jobHandlerClassName = 'app\jobs\WeworkFileUpload';
- $jobQueueName = 'upload_wework_file';
- $orderData = []; //这个是需要传到消费者的数据
- Queue::later(0, $jobHandlerClassName, $orderData, $jobQueueName);
- }
- return true;
- }
- /*
- * oss文件上传
- */
- private function ossUpload($path, $file)
- {
- $accessKeyId = config('app.wework_ali_oss_access_key_id');
- $accessKeySecret = config('app.wework_ali_oss_access_key_secret');
- $endpoint = config('app.wework_ali_oss_end_point');
- $bucket = config('app.wework_ali_oss_bucket');
- $oss = new OssClient($accessKeyId, $accessKeySecret, $endpoint);
- try {
- $oss->uploadFile($bucket, $path, $file);
- } catch (OssException $e) {
- return false;
- }
- return true;
- }
- /*
- * oss检测文件
- */
- private function ossCheck($file)
- {
- $accessKeyId = config('app.wework_ali_oss_access_key_id');
- $accessKeySecret = config('app.wework_ali_oss_access_key_secret');
- $endpoint = config('app.wework_ali_oss_end_point');
- $bucket = config('app.wework_ali_oss_bucket');
- $oss = new OssClient($accessKeyId, $accessKeySecret, $endpoint);
- try {
- $exist = $oss->doesObjectExist($bucket, $file);
- return $exist;
- } catch (OssException $e) {
- return false;
- }
- }
- }
|