CacheShelfTaskJob.php 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. <?php
  2. namespace App\Jobs;
  3. use App\Components\ErrorPush;
  4. use App\Services\ForeignHaiRoboticsService;
  5. use App\Services\LogService;
  6. use App\Station;
  7. use App\StationTaskMaterialBox;
  8. use App\TaskTransaction;
  9. use Illuminate\Bus\Queueable;
  10. use Illuminate\Contracts\Queue\ShouldQueue;
  11. use Illuminate\Database\Eloquent\Builder;
  12. use Illuminate\Foundation\Bus\Dispatchable;
  13. use Illuminate\Queue\InteractsWithQueue;
  14. use Illuminate\Support\Collection;
  15. use Illuminate\Support\Facades\Cache;
  16. use Illuminate\Support\Facades\DB;
  17. use Illuminate\Support\Facades\Log;
  18. class CacheShelfTaskJob implements ShouldQueue
  19. {
  20. use Dispatchable, InteractsWithQueue, Queueable, ErrorPush;
  21. protected $key;
  22. protected $count;
  23. /**
  24. * Create a new job instance.
  25. *
  26. * @return void
  27. */
  28. public function __construct(string $key,int $count)
  29. {
  30. $this->count = $count;
  31. $this->key = $key;
  32. }
  33. /**
  34. * Execute the job.
  35. *
  36. * @return void
  37. * @throws
  38. */
  39. public function handle()
  40. {
  41. /** @var ForeignHaiRoboticsService $service */
  42. $service = app("ForeignHaiRoboticsService");
  43. switch ($this->key){
  44. case "CACHE_SHELF_AVAILABLE"://缓存架释放呼叫
  45. //等待一定时间来合并同类请求至此
  46. $available = Cache::get($this->key,0);
  47. Log::debug("队列事务1",["当前数量:{$this->count}","缓存数量:$available"]);
  48. if ($this->count!=$available)return;
  49. Cache::forget($this->key); //无论是否开始分发 都清除本次缓存架的计数器
  50. //获取可用缓存架
  51. $stations = app("StationService")->getCacheShelf(true);
  52. Log::debug("队列事务2",["可用缓存架:{$stations->count()}"]);
  53. if ($stations->count()==0)break;
  54. //检查事务 尝试分发任务 改变下方序列来控制分发顺序 逐级分发 一次成功就终止
  55. if ($this->dispatchOutTask($stations,$service))break; //首先尝试向出库事务分发 分发成功跳出
  56. if ($this->dispatchInTask($stations,$service))break; //尝试向入库事务分发
  57. break;
  58. default://入库呼叫
  59. if (!Cache::has($this->key))return;
  60. /** @var Collection $task */
  61. list($task,$location) = Cache::get($this->key);
  62. if ($this->count!=$task->count())return;
  63. $dataToPost = $service->makeJson_move_multi($task, '缓存架入立架', $location);
  64. $controlSuccess = $service->controlHaiRobot($dataToPost,$task,'缓存架入立架');
  65. $tIds = [];
  66. $task->each(function ($t)use(&$tIds){$tIds[] = $t->id;});
  67. StationTaskMaterialBox::query()->whereIn("id",$tIds)
  68. ->where("status","待处理")->update(['status' => $controlSuccess ? '处理中' : '异常']);
  69. Cache::forget($this->key);
  70. //if ($controlSuccess)$this->materialBoxMappingCacheShelf($task,$location);
  71. }
  72. }
  73. /**
  74. * 料箱映射缓存架,因为建立的入立架任务源库位是立库,无法获取真实源库位,所以在此拿到映射库位
  75. * 在料箱被取走时通过任务料箱号获取对应库位,来标记该缓存架库位可以被执行任务了 StationTaskMaterialBoxService:markHasTaken
  76. * 出库会启用库位占用逻辑 入库分为:人工入库与系统自动入库 人工控制入库由人工自己辨识库位可用度,而系统入库只能通过此映射来拿到可用库位信息
  77. *
  78. * @param Collection $task
  79. * @param Collection $location
  80. *
  81. * @return void
  82. */
  83. public function materialBoxMappingCacheShelf(Collection $task,Collection $location)
  84. {
  85. $map = Cache::get("CACHE_SHELF_MAPPING",function (){return [];});
  86. foreach ($task as $key=>$obj)$map[$obj->material_box_id] = $location[$key];
  87. Cache::forever("CACHE_SHELF_MAPPING",$map);
  88. }
  89. /**
  90. * 分发出库任务
  91. *
  92. * @param $stations
  93. * @param $service
  94. * @return bool
  95. */
  96. private function dispatchOutTask(&$stations,$service):bool
  97. {
  98. DB::beginTransaction();
  99. try {
  100. $tasks = TaskTransaction::query()->selectRaw("task_id,GROUP_CONCAT(id) AS ids,id")->with("task")
  101. ->where("type","出库")->whereHas("task",function ($query){
  102. $query->where("status","待处理");
  103. })->where("status",3)->lockForUpdate()
  104. ->where("mark",2)->groupBy("task_id")->get(); //检索等待的队列事务来获取对应任务
  105. Log::debug("队列事务3",["队列出库任务:{$tasks->count()}"]);
  106. if ($tasks->count()==0)return false;
  107. if ($this->dispatchTask($tasks,$stations,$service,function ($obj,$stationId,$time,&$updateTransaction){
  108. if ($obj->ids!=$obj->id){
  109. $ids = explode(",",$obj->ids);
  110. foreach ($ids as $id)$updateTransaction[] = ["id"=>$id,"to_station_id"=>$stationId,"status"=>0,"updated_at"=>$time];
  111. }else $updateTransaction[] = ["id"=>$obj->id,"to_station_id"=>$stationId,"status"=>0,"updated_at"=>$time];
  112. },function ($service,$toLocation,$task,$prefix){
  113. return $service->fetchGroup_multiLocation($toLocation,$task,$prefix,'立架出至缓存架',20,false);
  114. },"to_station_id")){
  115. Log::debug("队列事务4",["缓存架剩余数量:{$stations->count()}"]);
  116. DB::commit();return $stations->count()==0;} //缓存架用完 跳出,否则接着分发
  117. DB::rollBack();
  118. }catch (\Exception $e){
  119. DB::rollBack();
  120. $this->push(__METHOD__."->".__LINE__,"出库队列执行错误",$e->getMessage()." | 当前任务数:".$this->count." | 缓存信息:".(isset($available) ? json_encode($available) : ''));
  121. }
  122. return false;
  123. }
  124. /**
  125. * 分发入库任务
  126. *
  127. * @param $stations
  128. * @param $service
  129. * @return bool
  130. */
  131. private function dispatchInTask(&$stations,$service):bool
  132. {
  133. DB::beginTransaction();
  134. try {
  135. $tasks = TaskTransaction::query()->with("task")
  136. ->where("type","入库")->whereHas("task",function ($query){
  137. $query->where("status","待处理");
  138. })->where("status",3)->lockForUpdate()
  139. ->where("mark",1)->get(); //检索等待的队列事务来获取对应任务
  140. Log::debug("队列事务3",["队列入库任务:{$tasks->count()}"]);
  141. if ($tasks->count()==0)return false;
  142. if ($this->dispatchTask($tasks,$stations,$service,function ($obj,$stationId,$time,&$updateTransaction){
  143. $updateTransaction[] = ["id"=>$obj->id,"fm_station_id"=>$stationId,"status"=>0,"updated_at"=>$time];
  144. },function ($service,$toLocation,$task,$prefix){
  145. return $service->fetchGroup_multiLocation($toLocation,$task,'','立架出至缓存架',20,false);
  146. },"fm_station_id")){
  147. Log::debug("队列事务5",["缓存架剩余数量:{$stations->count()}"]);
  148. DB::commit();return $stations->count()==0; //缓存架用完 跳出,否则接着分发
  149. }
  150. DB::rollBack();
  151. }catch (\Exception $e){
  152. DB::rollBack();
  153. $this->push(__METHOD__."->".__LINE__,"入库队列执行错误",$e->getMessage()." | 当前任务数:".$this->count." | 缓存信息:".(isset($available) ? json_encode($available) : ''));
  154. }
  155. return false;
  156. }
  157. private function dispatchTask(\Illuminate\Database\Eloquent\Collection $tasks,&$stations, $service,
  158. \Closure $update, \Closure $execute, string $stationName):bool
  159. {
  160. $locations = $stations;
  161. if ($tasks->count()>$locations->count())$tasks = $tasks->slice(0,$locations->count());//事务过多切割部分处理
  162. if ($tasks->count()<$locations->count()){
  163. $stations = $stations->slice($tasks->count());
  164. $stations = $stations->values($stations);
  165. }
  166. $toLocation = collect();
  167. $task = collect();
  168. $updateTask = [["id","station_id","updated_at"]];
  169. $updateTransaction = [["id",$stationName,"status","updated_at"]];
  170. $time = date("Y-m-d H:i:s");
  171. $map = [];
  172. foreach ($tasks as $index=>$obj){
  173. $toLocation->push($stations[$index]->code);
  174. $map[$stations[$index]->code] = $stations[$index]->id;
  175. $obj->task->station_id = $stations[$index]->id;
  176. $task->push($obj->task);
  177. $updateTask[] = ["id"=>$obj->task->id,"station_id"=>$stations[$index]->id,"updated_at"=>$time];
  178. $update($obj,$stations[$index]->id,$time,$updateTransaction);
  179. }
  180. app("BatchUpdateService")->batchUpdate("station_task_material_boxes",$updateTask);
  181. app("BatchUpdateService")->batchUpdate("task_transactions",$updateTransaction);
  182. if ($execute($service,$toLocation,$task,$tasks[0]->station_task_batch_id ?: '')){
  183. foreach ($toLocation as $value){
  184. app("CacheShelfService")->lightUp($value,'3','0',["title"=>"机器人取箱中,禁止操作"]);
  185. Cache::forever("CACHE_SHELF_OCCUPANCY_{$map[$value]}",true);
  186. }
  187. app("StationService")->locationOccupyMulti($toLocation->toArray());
  188. DB::commit();
  189. return true;
  190. }
  191. DB::rollBack();
  192. $this->push(__METHOD__."->".__LINE__,"缓存队列执行错误","库位信息:".json_encode($toLocation)." 任务信息:".json_encode($task));
  193. return false;
  194. }
  195. }