stationService = null; $this->stationTypeService = null; $this->stationTaskBatchTypeService = null; $this->batchService = null; $this->stationTaskService = null; $this->foreignHaiRoboticsService = null; } /** * @param Collection $batches Batch[] * @param Collection $stationTasks_toAttach * @return Collection * @throws Exception */ function createByBatches(Collection $batches, Collection $stationTasks_toAttach): Collection { $this->stationService = app('StationService'); $this->stationTaskService = app('StationTaskService'); $this->stationTypeService = app('StationTypeService'); $this->stationTaskBatchTypeService = app('StationTaskBatchTypeService'); $this->batchService = app('BatchService'); $stationTaskBatches_toCreate = new Collection(); $stationTaskBatchType = $this->stationTaskBatchTypeService->firstByWhere('name', 'U型线分捡'); $id_stationTaskBatchType=$stationTaskBatchType['id']??''; $batches_handled = collect(); foreach ($batches as $batch) { if ($batch['status'] != '已处理') { $stationType = $this->stationTypeService->getByBatch($batch); $station = $this->stationService->getStation_byType($stationType['name']); $stationTaskBatches_toCreate->push( new StationTaskBatch([ 'batch_id' => $batch['id'], 'station_id' => $station['id'], 'station_task_batch_type_id' => $id_stationTaskBatchType, 'status' => '待处理' ]) ); $batches_handled->push($batch); } } $this->batchService->updateWhereIn('id', data_get($batches_handled, '*.id'), ['status' => '处理中']); $this->insert($stationTaskBatches_toCreate->toArray()); $stationTaskBatches_toCreate=$this->getAndAttachIds($stationTaskBatches_toCreate); $this->stationTaskService->registerSubTasks( $stationTasks_toAttach, $stationTaskBatches_toCreate->map(function($taskBatch){ return [$taskBatch]; }) ); $this->stationTaskService->registerStations( $stationTasks_toAttach, data_get($stationTaskBatches_toCreate,'*.station_id') ); return $stationTaskBatches_toCreate; } function getAndAttachIds($stationTaskBatches): Collection { $md5=md5(is_array($stationTaskBatches) ?$md5=json_encode($stationTaskBatches):$stationTaskBatches->toJson()); return Cache::remember( 'StationTaskBatch_'.$md5??md5(json_encode($stationTaskBatches->toArray())) ,config('cache.expirations.rarelyChange') ,function()use($stationTaskBatches){ return StationTaskBatch::query() ->whereIn('status',data_get($stationTaskBatches,'*.status')) ->whereIn('batch_id',data_get($stationTaskBatches,'*.batch_id')) ->orderByDesc('id') ->limit(count($stationTaskBatches)) ->get(); }); } function markManyExcepted(Collection $stationTaskBatches_failed) { foreach ( $stationTaskBatches_failed as $stationTaskBatch) { if($stationTaskBatch['status']!='异常') $this->markExcepted($stationTaskBatch); } ($logAtFailings_andWait = function ($stationTaskBatches_failed) { if ($stationTaskBatches_failed->isEmpty()) return; throw new ErrorException('任务波次异常失败的'); })($stationTaskBatches_failed); } /** * @param Collection|null $stationTaskBatches * @return Collection|\Tightenco\Collect\Support\Collection|null 返回执行失败的记录 * @throws ErrorException */ function runMany(?Collection $stationTaskBatches):?Collection { LogService::log(__METHOD__,'runMany','波次任务分配6.1:'.json_encode($stationTaskBatches)); $stationTaskBatches_failed = null; ($execute = function ( Collection $stationTaskBatches, &$stationTaskBatches_failed) { LogService::log(__METHOD__,'runMany','波次任务分配6.2:'.json_encode($stationTaskBatches)); if ($stationTaskBatches->isEmpty()) return; LogService::log(__METHOD__,'runMany','波次任务分配6.3:'.json_encode($stationTaskBatches)); $stationTaskBatches_failed = collect(); LogService::log(__METHOD__,'runMany','波次任务分配6.4:'.json_encode($stationTaskBatches)); foreach ($stationTaskBatches as $stationTaskBatch) { LogService::log(__METHOD__,'runMany','波次任务分配6.5:'.json_encode($stationTaskBatches)); $failed = !$this->run($stationTaskBatch); LogService::log(__METHOD__,'runMany','波次任务分配6.6:'.json_encode($stationTaskBatches)); if ($failed) $stationTaskBatches_failed->push($stationTaskBatch); } })($stationTaskBatches, $stationTaskBatches_failed); ($logAtFailings_andWait = function ($stationTaskBatches_failed) { LogService::log(__METHOD__,'runMany','波次任务分配6.7:'.json_encode($stationTaskBatches_failed)); if ($stationTaskBatches_failed->isEmpty()) return; LogService::log(__METHOD__,'runMany','波次任务分配6.8:'.json_encode($stationTaskBatches_failed)); $retry_after_sec = config('task.batchTask.retry_after_sec'); LogService::log(__METHOD__, __FUNCTION__, '任务波次没有执行完的,' . $retry_after_sec . '秒后准备重试:' . $stationTaskBatches_failed->toJson() . '调用堆栈r:' . json_encode(array_slice(debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS), 0, 3)) ); LogService::log(__METHOD__,'runMany','波次任务分配6.9:'.json_encode($stationTaskBatches_failed)); sleep($retry_after_sec); LogService::log(__METHOD__,'runMany','波次任务分配6.91:'.json_encode($stationTaskBatches_failed)); })($stationTaskBatches_failed); LogService::log(__METHOD__,'runMany','波次任务分配6.92:'.json_encode($stationTaskBatches)); $execute ($stationTaskBatches_failed, $stationTaskBatches_failed); //再次尝试 LogService::log(__METHOD__,'runMany','波次任务分配6.93:'.json_encode($stationTaskBatches)); $this->markManyExcepted ($stationTaskBatches_failed); LogService::log(__METHOD__,'runMany','波次任务分配6.94:'.json_encode($stationTaskBatches)); return $stationTaskBatches_failed; } function run(StationTaskBatch $stationTaskBatch): bool { LogService::log(__METHOD__,'runMany','波次任务分配6.r1:'.json_encode($stationTaskBatch)); $this->instant($this->foreignHaiRoboticsService,'ForeignHaiRoboticsService'); $this->instant($this->stationService,'StationService'); $stationTaskBatch->loadMissing(['station','stationTask.stationTaskMaterialBoxes']); LogService::log(__METHOD__,'runMany','波次任务分配6.r2:'.json_encode($stationTaskBatch)); $toLocation = $this->stationService->getULineEntrance($stationTaskBatch['station'])['code']; LogService::log(__METHOD__,'runMany','波次任务分配6.r3:'.json_encode($stationTaskBatch)); $groupPrefix = $stationTaskBatch['id']; $taskMaterialBoxes = $stationTaskBatch['stationTask']['stationTaskMaterialBoxes'] ?? (function () use ($stationTaskBatch) { LogService::log(__METHOD__,'runMany','波次任务分配6.r4:'.json_encode($stationTaskBatch)); throw new Exception('找不到料箱:' . json_encode($stationTaskBatch)); })(); try{ LogService::log(__METHOD__,'runMany','波次任务分配6.r5:'.json_encode($stationTaskBatch)); $isFetchedFromRobotics = $this->foreignHaiRoboticsService-> fetchGroup($toLocation, $taskMaterialBoxes, $groupPrefix); LogService::log(__METHOD__,'runMany','波次任务分配6.r6:'.json_encode($stationTaskBatch)); }catch(Exception $e){ throw new ErrorException('$stationTaskBatch运行波次机器人任务失败,获取组失败: '.$stationTaskBatch->toJson() . $e->getMessage()); } LogService::log(__METHOD__,'runMany','波次任务分配6.r7:'.json_encode($stationTaskBatch)); ($markNewStatus =function()use($isFetchedFromRobotics,$stationTaskBatch){ $isFetchedFromRobotics? $this->markProcessing($stationTaskBatch): $this->markExcepted($stationTaskBatch); LogService::log(__METHOD__,'runMany','波次任务分配6.r8:'.json_encode($stationTaskBatch)); })(); LogService::log(__METHOD__,'runMany','波次任务分配6.r9:'.json_encode($stationTaskBatch)); return $isFetchedFromRobotics; } function markProcessing($stationTaskBatch_orCollection) { if (get_class($stationTaskBatch_orCollection)==StationTaskBatch::class){ $stationTaskBatch_orCollection = collect([$stationTaskBatch_orCollection]); } $this->markProcessing_byIds(data_get($stationTaskBatch_orCollection, '*.id')); } function markProcessing_byIds($ids) { if(!$ids)$ids=[]; if(!is_array($ids))$ids=[$ids]; $hasProcessing=StationTaskBatch::query() ->whereNotIn('id', $ids) ->where(['status'=>'处理中']) ->where('created_at','>',Carbon::now()->subDay()) ->get('id')->isNotEmpty(); $status = '处理中'; if($hasProcessing) $status = '处理队列'; StationTaskBatch::query() ->whereIn('id', $ids) ->update(['status'=>$status]); } function markProcessed($stationTaskBatch_orCollection) { if (get_class($stationTaskBatch_orCollection)==StationTaskBatch::class){ $stationTaskBatch_orCollection = collect([$stationTaskBatch_orCollection]); } $this->markProcessed_byIds(data_get($stationTaskBatch_orCollection, '*.id')); } function markProcessed_byIds($ids) { if(!$ids)$ids=[]; if(!is_array($ids))$ids=[$ids]; StationTaskBatch::query() ->whereIn('id', $ids) ->update(['status'=>'完成']); } // function markFinished($stationTaskBatches) // { // if (get_class($stationTaskBatches)==StationTaskBatch::class){ // $stationTaskBatches = collect($stationTaskBatches); // } // StationTaskBatch::query() // ->whereIn('id', data_get($stationTaskBatches, '*.id')) // ->update(['status'=>'完成']); // // $this->stationTaskService // ->markProcessing_byId( // data_get($stationTaskBatches, '*.station_id') // ); // } function markExcepted(StationTaskBatch $stationTaskBatch) { $stationTaskBatch['status'] = '异常'; $stationTaskBatch ->update(); } }