CacheShelfTaskJob.php 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. <?php
  2. namespace App\Jobs;
  3. use App\Components\ErrorPush;
  4. use App\Services\ForeignHaiRoboticsService;
  5. use App\Station;
  6. use App\StationTaskMaterialBox;
  7. use App\TaskTransaction;
  8. use Illuminate\Bus\Queueable;
  9. use Illuminate\Contracts\Queue\ShouldQueue;
  10. use Illuminate\Database\Eloquent\Builder;
  11. use Illuminate\Foundation\Bus\Dispatchable;
  12. use Illuminate\Queue\InteractsWithQueue;
  13. use Illuminate\Support\Collection;
  14. use Illuminate\Support\Facades\Cache;
  15. use Illuminate\Support\Facades\DB;
  16. class CacheShelfTaskJob implements ShouldQueue
  17. {
  18. use Dispatchable, InteractsWithQueue, Queueable, ErrorPush;
  19. protected $key;
  20. protected $count;
  21. /**
  22. * Create a new job instance.
  23. *
  24. * @return void
  25. */
  26. public function __construct(string $key,int $count)
  27. {
  28. $this->count = $count;
  29. $this->key = $key;
  30. }
  31. /**
  32. * Execute the job.
  33. *
  34. * @return void
  35. * @throws
  36. */
  37. public function handle()
  38. {
  39. /** @var ForeignHaiRoboticsService $service */
  40. $service = app("ForeignHaiRoboticsService");
  41. switch ($this->key){
  42. case "CACHE_SHELF_AVAILABLE"://缓存架释放呼叫
  43. //等待一定时间来合并同类请求至此
  44. $available = Cache::get($this->key,function (){return [];});
  45. if ($this->count!==count($available))return;
  46. //检查事务 尝试分发任务 改变下方序列来控制分发顺序 逐级分发 一次成功就终止
  47. if ($this->dispatchOutTask($available,$service))break; //首先尝试向出库事务分发 分发成功跳出
  48. if ($this->dispatchInTask($available,$service))break; //尝试向入库事务分发
  49. break;
  50. default://入库呼叫
  51. if (!Cache::has($this->key))return;
  52. /** @var Collection $task */
  53. list($task,$location) = Cache::get($this->key);
  54. if ($this->count!==$task->count())return;
  55. $dataToPost = $service->makeJson_move_multi($task, '缓存架入立架', $location);
  56. $controlSuccess = $service->controlHaiRobot($dataToPost,$task,'缓存架入立架');
  57. $tIds = [];
  58. $task->each(function ($t)use(&$tIds){$tIds[] = $t->id;});
  59. StationTaskMaterialBox::query()->where("id",$tIds)
  60. ->where("status","待处理")->update(['status' => $controlSuccess ? '处理中' : '异常']);
  61. Cache::forget($this->key);
  62. if ($controlSuccess)$this->materialBoxMappingCacheShelf($task,$location);
  63. }
  64. }
  65. /**
  66. * 料箱映射缓存架,因为建立的入立架任务源库位是立库,无法获取真实源库位,所以在此拿到映射库位
  67. * 在料箱被取走时通过任务料箱号获取对应库位,来标记该缓存架库位可以被执行任务了 StationTaskMaterialBoxService:markHasTaken
  68. * 出库会启用库位占用逻辑 入库分为:人工入库与系统自动入库 人工控制入库由人工自己辨识库位可用度,而系统入库只能通过此映射来拿到可用库位信息
  69. *
  70. * @param Collection $task
  71. * @param Collection $location
  72. *
  73. * @return void
  74. */
  75. public function materialBoxMappingCacheShelf(Collection $task,Collection $location)
  76. {
  77. $map = Cache::get("CACHE_SHELF_MAPPING",function (){return [];});
  78. foreach ($task as $key=>$obj)$map[$obj->material_box_id] = $location[$key];
  79. Cache::forever("CACHE_SHELF_MAPPING",$map);
  80. }
  81. /**
  82. * 分发出库任务
  83. *
  84. * @param array $available
  85. * @param $service
  86. * @return bool
  87. */
  88. private function dispatchOutTask(array $available, $service):bool
  89. {
  90. DB::beginTransaction();
  91. try {
  92. $tasks = TaskTransaction::query()->selectRaw("task_id,GROUP_CONCAT(id) AS ids,id")->with("task")
  93. ->where("type","出库")->whereHas("task",function ($query){
  94. $query->where("status","待处理");
  95. })->where("status",3)->lockForUpdate()
  96. ->where("mark",2)->groupBy("task_id")->get(); //检索等待的队列事务来获取对应任务
  97. if ($this->dispatchTask($tasks,$available,$service,function ($obj,$stationId,$time,&$updateTransaction){
  98. if ($obj->ids!=$obj->id){
  99. $ids = explode(",",$obj->ids);
  100. foreach ($ids as $id)$updateTransaction[] = ["id"=>$id,"to_station_id"=>$stationId,"status"=>0,"updated_at"=>$time];
  101. }else $updateTransaction[] = ["id"=>$obj->id,"to_station_id"=>$stationId,"status"=>0,"updated_at"=>$time];
  102. },function ($service,$toLocation,$task,$prefix){
  103. return $service->fetchGroup_multiLocation($toLocation,$task,$prefix,'立架出至缓存架',20);
  104. },"to_station_id")){DB::commit();return true;}
  105. DB::rollBack();
  106. }catch (\Exception $e){
  107. DB::rollBack();
  108. $this->push(__METHOD__."->".__LINE__,"出库队列执行错误",$e->getMessage()." | 当前任务数:".$this->count." | 缓存信息:".(isset($available) ? json_encode($available) : ''));
  109. }
  110. return false;
  111. }
  112. /**
  113. * 分发入库任务
  114. *
  115. * @param array $available
  116. * @param $service
  117. * @return bool
  118. */
  119. private function dispatchInTask(array $available, $service):bool
  120. {
  121. DB::beginTransaction();
  122. try {
  123. $tasks = TaskTransaction::query()->with("task")
  124. ->where("type","入库")->whereHas("task",function ($query){
  125. $query->where("status","待处理");
  126. })->where("status",3)->lockForUpdate()
  127. ->where("mark",1)->get(); //检索等待的队列事务来获取对应任务
  128. if ($this->dispatchTask($tasks,$available,$service,function ($obj,$stationId,$time,&$updateTransaction){
  129. $updateTransaction[] = ["id"=>$obj->id,"fm_station_id"=>$stationId,"status"=>0,"updated_at"=>$time];
  130. },function ($service,$toLocation,$task,$prefix){
  131. return $service->fetchGroup_multiLocation($toLocation,$task,'','立架出至缓存架');
  132. },"fm_station_id")){DB::commit();return true;}
  133. DB::rollBack();
  134. }catch (\Exception $e){
  135. DB::rollBack();
  136. $this->push(__METHOD__."->".__LINE__,"入库队列执行错误",$e->getMessage()." | 当前任务数:".$this->count." | 缓存信息:".(isset($available) ? json_encode($available) : ''));
  137. }
  138. return false;
  139. }
  140. private function dispatchTask(\Illuminate\Database\Eloquent\Collection $tasks, array $available, $service,
  141. \Closure $update, \Closure $execute, string $stationName):bool
  142. {
  143. if (!$tasks->count())return false;
  144. if ($tasks->count()>count($available))$tasks = $tasks->slice(0,count($available));//事务过多切割部分处理
  145. $toLocation = collect();
  146. $task = collect();
  147. $availableTemp = array_keys($available);
  148. $map = app("StationService")->getStationMapping($availableTemp);//获取库位映射信息
  149. $updateTask = [["id","station_id","updated_at"]];
  150. $updateTransaction = [["id",$stationName,"status","updated_at"]];
  151. $time = date("Y-m-d H:i:s");
  152. foreach ($tasks as $index=>$obj){
  153. $loc = $availableTemp[$index];
  154. $toLocation->push($loc);
  155. $obj->task->station_id = $map[$loc];
  156. $task->push($obj->task);
  157. unset($available[$loc]);
  158. $updateTask[] = ["id"=>$obj->task->id,"station_id"=>$map[$loc],"updated_at"=>$time];
  159. $update($obj,$map[$loc],$time,$updateTransaction);
  160. }
  161. app("BatchUpdateService")->batchUpdate("station_task_material_boxes",$updateTask);
  162. app("BatchUpdateService")->batchUpdate("task_transactions",$updateTransaction);
  163. if ($execute($service,$toLocation,$task,$tasks[0]->station_task_batch_id)){
  164. Cache::forever($this->key,$available);
  165. foreach ($toLocation as $value){
  166. app("CacheShelfService")->lightUp($value,'3','0');
  167. Cache::forever("CACHE_SHELF_OCCUPANCY_{$map[$value]}",true);
  168. }
  169. app("StationService")->locationOccupyMulti($toLocation->toArray());
  170. DB::commit();
  171. return true;
  172. }
  173. DB::rollBack();
  174. $this->push(__METHOD__."->".__LINE__,"缓存队列执行错误","库位信息:".json_encode($toLocation)." 任务信息:".json_encode($task));
  175. return false;
  176. }
  177. }