count = $count; $this->key = $key; } /** * Execute the job. * * @return void * @throws */ public function handle() { /** @var ForeignHaiRoboticsService $service */ $service = app("ForeignHaiRoboticsService"); switch ($this->key){ case "CACHE_SHELF_AVAILABLE"://缓存架释放呼叫 //等待一定时间来合并同类请求至此 $available = Cache::get($this->key,function (){return [];}); if ($this->count!==count($available))return; //检查事务 尝试分发任务 改变下方序列来控制分发顺序 逐级分发 一次成功就终止 if ($this->dispatchOutTask($available,$service))break; //首先尝试向出库事务分发 分发成功跳出 if ($this->dispatchInTask($available,$service))break; //尝试向入库事务分发 break; default://入库呼叫 if (!Cache::has($this->key))return; /** @var Collection $task */ list($task,$location) = Cache::get($this->key); if ($this->count!==$task->count())return; $dataToPost = $service->makeJson_move_multi($task, '缓存架入立架', $location); $controlSuccess = $service->controlHaiRobot($dataToPost,$task,'缓存架入立架'); $tIds = []; $task->each(function ($t)use(&$tIds){$tIds[] = $t->id;}); StationTaskMaterialBox::query()->where("id",$tIds) ->where("status","待处理")->update(['status' => $controlSuccess ? '处理中' : '异常']); Cache::forget($this->key); if ($controlSuccess)$this->materialBoxMappingCacheShelf($task,$location); } } /** * 料箱映射缓存架,因为建立的入立架任务源库位是立库,无法获取真实源库位,所以在此拿到映射库位 * 在料箱被取走时通过任务料箱号获取对应库位,来标记该缓存架库位可以被执行任务了 StationTaskMaterialBoxService:markHasTaken * 出库会启用库位占用逻辑 入库分为:人工入库与系统自动入库 人工控制入库由人工自己辨识库位可用度,而系统入库只能通过此映射来拿到可用库位信息 * * @param Collection $task * @param Collection $location * * @return void */ public function materialBoxMappingCacheShelf(Collection $task,Collection $location) { $map = Cache::get("CACHE_SHELF_MAPPING",function (){return [];}); foreach ($task as $key=>$obj)$map[$obj->material_box_id] = $location[$key]; Cache::forever("CACHE_SHELF_MAPPING",$map); } /** * 分发出库任务 * * @param array $available * @param $service * @return bool */ private function dispatchOutTask(array $available, $service):bool { DB::beginTransaction(); try { $tasks = TaskTransaction::query()->selectRaw("task_id,GROUP_CONCAT(id) AS ids,id")->with("task") ->where("type","出库")->whereHas("task",function ($query){ $query->where("status","待处理"); })->where("status",3)->lockForUpdate() ->where("mark",2)->groupBy("task_id")->get(); //检索等待的队列事务来获取对应任务 if ($this->dispatchTask($tasks,$available,$service,function ($obj,$stationId,$time,&$updateTransaction){ if ($obj->ids!=$obj->id){ $ids = explode(",",$obj->ids); foreach ($ids as $id)$updateTransaction[] = ["id"=>$id,"to_station_id"=>$stationId,"status"=>0,"updated_at"=>$time]; }else $updateTransaction[] = ["id"=>$obj->id,"to_station_id"=>$stationId,"status"=>0,"updated_at"=>$time]; },function ($service,$toLocation,$task,$prefix){ return $service->fetchGroup_multiLocation($toLocation,$task,$prefix,'立架出至缓存架',20); },"to_station_id")){DB::commit();return true;} DB::rollBack(); }catch (\Exception $e){ DB::rollBack(); $this->push(__METHOD__."->".__LINE__,"出库队列执行错误",$e->getMessage()." | 当前任务数:".$this->count." | 缓存信息:".(isset($available) ? json_encode($available) : '')); } return false; } /** * 分发入库任务 * * @param array $available * @param $service * @return bool */ private function dispatchInTask(array $available, $service):bool { DB::beginTransaction(); try { $tasks = TaskTransaction::query()->with("task") ->where("type","入库")->whereHas("task",function ($query){ $query->where("status","待处理"); })->where("status",3)->lockForUpdate() ->where("mark",1)->get(); //检索等待的队列事务来获取对应任务 if ($this->dispatchTask($tasks,$available,$service,function ($obj,$stationId,$time,&$updateTransaction){ $updateTransaction[] = ["id"=>$obj->id,"fm_station_id"=>$stationId,"status"=>0,"updated_at"=>$time]; },function ($service,$toLocation,$task,$prefix){ return $service->fetchGroup_multiLocation($toLocation,$task,'','立架出至缓存架'); },"fm_station_id")){DB::commit();return true;} DB::rollBack(); }catch (\Exception $e){ DB::rollBack(); $this->push(__METHOD__."->".__LINE__,"入库队列执行错误",$e->getMessage()." | 当前任务数:".$this->count." | 缓存信息:".(isset($available) ? json_encode($available) : '')); } return false; } private function dispatchTask(\Illuminate\Database\Eloquent\Collection $tasks, array $available, $service, \Closure $update, \Closure $execute, string $stationName):bool { if (!$tasks->count())return false; if ($tasks->count()>count($available))$tasks = $tasks->slice(0,count($available));//事务过多切割部分处理 $toLocation = collect(); $task = collect(); $availableTemp = array_keys($available); $map = app("StationService")->getStationMapping($availableTemp);//获取库位映射信息 $updateTask = [["id","station_id","updated_at"]]; $updateTransaction = [["id",$stationName,"status","updated_at"]]; $time = date("Y-m-d H:i:s"); foreach ($tasks as $index=>$obj){ $loc = $availableTemp[$index]; $toLocation->push($loc); $obj->task->station_id = $map[$loc]; $task->push($obj->task); unset($available[$loc]); $updateTask[] = ["id"=>$obj->task->id,"station_id"=>$map[$loc],"updated_at"=>$time]; $update($obj,$map[$loc],$time,$updateTransaction); } app("BatchUpdateService")->batchUpdate("station_task_material_boxes",$updateTask); app("BatchUpdateService")->batchUpdate("task_transactions",$updateTransaction); if ($execute($service,$toLocation,$task,$tasks[0]->station_task_batch_id)){ Cache::forever($this->key,$available); foreach ($toLocation as $value){ app("CacheShelfService")->lightUp($value,'3','0'); Cache::forever("CACHE_SHELF_OCCUPANCY_{$map[$value]}",true); } app("StationService")->locationOccupyMulti($toLocation->toArray()); DB::commit(); return true; } DB::rollBack(); $this->push(__METHOD__."->".__LINE__,"缓存队列执行错误","库位信息:".json_encode($toLocation)." 任务信息:".json_encode($task)); return false; } }