SyncBatchTask.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Exceptions\ErrorException;
  4. use App\Exceptions\Exception;
  5. use App\Order;
  6. use App\OrderBin;
  7. use App\Services\BatchService;
  8. use App\Services\common\BatchUpdateService;
  9. use App\Services\DocWaveHeaderService;
  10. use App\Services\LogService;
  11. use App\ValueStore;
  12. use Carbon\Carbon;
  13. use Illuminate\Console\Command;
  14. use Illuminate\Support\Facades\DB;
  15. class SyncBatchTask extends Command
  16. {
  17. protected $signature = 'sync:batch';
  18. protected $description = 'sync wms batch task';
  19. /** @var DocWaveHeaderService $service */
  20. private $service;
  21. /** @var BatchService $batchService */
  22. private $batchService;
  23. public function __construct()
  24. {
  25. parent::__construct();
  26. $this->service = app(DocWaveHeaderService::class);
  27. $this->batchService = app(BatchService::class);
  28. }
  29. public function handle()
  30. {
  31. sleep(rand(0,10));
  32. $this->disposeHeader();
  33. $this->disposeDetail();
  34. }
  35. private function disposeHeader($date = null)
  36. {
  37. DB::transaction(function ()use($date){
  38. //获取更新时间与WMS数据
  39. if (!$date){
  40. $valueStore = ValueStore::query()->where("name","wave_last_sync_date")->lockForUpdate()->first();
  41. $date = $valueStore->value ?? Carbon::now()->subSeconds(65)->toDateTimeString();
  42. }
  43. $count = DB::connection("oracle")->selectOne(DB::raw("SELECT COUNT(*) count FROM DOC_WAVE_HEADER WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')"),[$date]);
  44. if ($count->count > 500){
  45. $query = DB::raw(<<<sql
  46. SELECT * FROM (SELECT header.*, ROWNUM AS rowno FROM (
  47. SELECT * FROM DOC_WAVE_HEADER WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')
  48. ORDER BY EDITTIME)header WHERE ROWNUM <= 500)wave WHERE wave.rowno >= 0
  49. sql
  50. );
  51. $waves = DB::connection("oracle")->select($query,[$date]);
  52. $this->headerExe($waves);
  53. $this->disposeHeader($waves[count($waves)-1]->edittime);
  54. }else{
  55. $waves = $this->service->get(["edittime"=>$date],["edittime"=>"gtOrEqual"]);
  56. $this->headerExe($waves);
  57. }
  58. });
  59. }
  60. private function headerExe($waves)
  61. {
  62. if (count($waves) < 1)return;
  63. //获取本地数据对比差异
  64. $codes = [];
  65. foreach ($waves as $wave){
  66. $codes[] = $wave->waveno;
  67. }
  68. $map = [];
  69. $batches = $this->batchService->get(["code"=>$codes]);
  70. if ($batches){
  71. foreach ($batches as $index=>$batch)$map[$batch->code] = $index;
  72. }
  73. $update = [["id","wms_status","wms_type","updated_at"]];
  74. $insert = [];
  75. foreach ($waves as $wave){
  76. if (isset($map[$wave->waveno])){
  77. $bat = $batches[$map[$wave->waveno]];
  78. $wms_status = $this->wms_status($wave);
  79. if ($bat->wms_status != $wms_status || $bat->wms_type != $wave->descr){
  80. $update[] = [
  81. "id" => $bat->id,
  82. "wms_status" => $this->wms_status($wave),
  83. "wms_type"=>$wave->descr,
  84. "updated_at"=>$wave->edittime,
  85. ];
  86. }
  87. continue;
  88. }
  89. $owner = app("OwnerService")->codeGetOwner($wave->customerid);
  90. $insert[] = [
  91. "code" => $wave->waveno,
  92. "wms_type"=>$wave->descr,
  93. "status" => '未处理',
  94. "wms_status" => $this->wms_status($wave),
  95. "created_at"=>$wave->addtime,
  96. "updated_at"=>$wave->edittime,
  97. "owner_id"=>$owner->id,
  98. ];
  99. }
  100. //存在则更新
  101. if (count($update)>1){
  102. $bool = app(BatchUpdateService::class)->batchUpdate("batches",$update);
  103. if ($bool!==false)LogService::log(__METHOD__,"SUCCESS-同步更新波次成功",json_encode($update));
  104. else LogService::log(__METHOD__,"ERROR-同步更新波次失败",json_encode($update));
  105. }
  106. //不存在则录入
  107. if ($insert){
  108. $this->batchService->insert($insert);
  109. LogService::log(__METHOD__,"SUCCESS-同步插入波次成功",json_encode($insert));
  110. }
  111. ValueStore::query()->where("name","wave_last_sync_date")->update(["value"=>$waves[count($waves)-1]->edittime]);
  112. }
  113. public function disposeDetail($date = null)
  114. {
  115. DB::transaction(function ()use($date){
  116. if (!$date){
  117. $valueStore = ValueStore::query()->where("name","wave_detail_last_sync_date")->lockForUpdate()->first();
  118. $date = $valueStore->value ?? Carbon::now()->subSeconds(65)->toDateTimeString();
  119. }
  120. $count = DB::connection("oracle")->selectOne(DB::raw("SELECT count(*) count FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')"),[$date]);
  121. if ($count->count > 1000){
  122. $sql = <<<sql
  123. SELECT * FROM (SELECT ORDERNO,WAVENO,SEQNO,EDITTIME, ROWNUM AS rowno FROM (
  124. SELECT * FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')
  125. ORDER BY EDITTIME) WHERE ROWNUM <= 1000)wave WHERE wave.rowno >= 0
  126. sql;
  127. $details = DB::connection("oracle")->select(DB::raw($sql),[$date]);
  128. $this->detailExe($details);
  129. $this->disposeDetail($details[count($details)-1]->edittime);
  130. }else{
  131. $sql = "SELECT ORDERNO,WAVENO,SEQNO,EDITTIME FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')";
  132. $details = DB::connection("oracle")->select(DB::raw($sql),[$date]);
  133. $this->detailExe($details);
  134. }
  135. });
  136. }
  137. private function detailExe($details)
  138. {
  139. if (count($details) < 1)return;
  140. $map = [];
  141. $nos = [];
  142. $orderCodes = [];
  143. $seqnos = [];
  144. $batchMapping = [];
  145. foreach ($details as $detail){
  146. if (isset($map[$detail->waveno]))$map[$detail->waveno][] = $detail->orderno;
  147. else {
  148. $map[$detail->waveno] = [$detail->orderno];
  149. $nos[] = $detail->waveno;
  150. }
  151. $orderCodes[] = $detail->orderno;
  152. $seqnos[$detail->orderno] = $detail->seqno;
  153. $batchMapping[$detail->orderno] = $detail->waveno;
  154. }
  155. $orders = Order::query()->select("id","batch_id","code")->whereIn("code",$orderCodes)->get();
  156. if ($orders){
  157. $orderIds = [];
  158. $orderMap = [];
  159. foreach ($orders as $order){
  160. $orderIds[] = $order->id;
  161. $orderMap[$order->id] = $seqnos[$order->code];
  162. }
  163. $updateBin = [["id","number"]];
  164. $insertBin = [];
  165. $deleteBin = [];
  166. $orderBins = OrderBin::query()->select("id","order_id","number")->whereIn("order_id",$orderIds)->get();
  167. foreach ($orderBins as $orderBin){
  168. if (!isset($orderMap[$orderBin->order_id])){
  169. $deleteBin[] = $orderBin->id;
  170. continue;
  171. }
  172. if ($orderBin->number != $orderMap[$orderBin->order_id])$updateBin[]=["id"=>$orderBin->id,"number"=>$orderMap[$orderBin->order_id]];
  173. unset($orderMap[$orderBin->order_id]);
  174. }
  175. $d = date('Y-m-d H:i:s');
  176. foreach ($orderMap as $orderId=>$binNumber){
  177. $insertBin[]=[
  178. "order_id"=>$orderId,
  179. "number"=>$binNumber,
  180. 'created_at' => $d
  181. ];
  182. }
  183. if (count($updateBin)>1){
  184. app(BatchUpdateService::class)->batchUpdate("order_bins",$updateBin);
  185. LogService::log(__METHOD__,"波次同步-更新订单格口号",json_encode($updateBin));
  186. }
  187. if ($insertBin){
  188. OrderBin::query()->insert($insertBin);
  189. LogService::log(__METHOD__,"波次同步-录入订单格口号",json_encode($insertBin));
  190. }
  191. if ($deleteBin){
  192. OrderBin::destroy($deleteBin);
  193. LogService::log(__METHOD__,"波次同步-删除订单格口号",json_encode($deleteBin));
  194. }
  195. }
  196. $batches = $this->batchService->get(["code"=>$nos]);
  197. $batchDiff = array_keys(array_flip(array_diff($nos,array_column($batches->toArray(),"code"))));
  198. if (count($batchDiff)>0){
  199. $sql = <<<sql
  200. SELECT * FROM DOC_WAVE_HEADER WHERE WAVENO IN (''
  201. sql;
  202. foreach ($batchDiff as $bd)$sql .= ",'".$bd."'";
  203. $sql .= ')';
  204. $wmsBatches = DB::connection("oracle")->select(DB::raw($sql));
  205. $this->headerExe($wmsBatches);
  206. $batches = $this->batchService->get(["code"=>$nos]);
  207. }
  208. $updateOrder = [["code","batch_id"]];
  209. foreach ($batches as $batch){
  210. foreach ($map[$batch->code] as $on){
  211. $updateOrder[] = [
  212. "code"=>$on,
  213. "batch_id"=>$batch->id
  214. ];
  215. }
  216. unset($map[$batch->code]);
  217. }
  218. if (count($updateOrder)>1){
  219. app("OrderService")->batchUpdate($updateOrder);//反向修改订单
  220. LogService::log(__METHOD__,"波次同步-修改订单波次号",json_encode($updateOrder));
  221. }
  222. if ($map){
  223. $waveCodes = array_keys($map);
  224. $waves = $this->service->get(["waveno"=>$waveCodes],["waveno"=>"in"]);
  225. $insert = [];
  226. foreach ($waves as $wave){
  227. $owner = app("OwnerService")->codeGetOwner($wave->customerid);
  228. $insert[] = [
  229. "code" => $wave->waveno,
  230. "status" => '未处理',
  231. "wms_status" => $this->wms_status($wave),
  232. "wms_type"=>$wave->descr,
  233. "created_at"=>$wave->addtime,
  234. "updated_at"=>$wave->edittime,
  235. "owner_id"=>$owner->id,
  236. ];
  237. }
  238. if ($insert){
  239. $this->batchService->insert($insert);
  240. LogService::log(__METHOD__,"SUCCESS-同步插入波次成功",json_encode($insert));
  241. $batches = $this->batchService->get(["code"=>$waveCodes]);
  242. foreach ($batches as $batch){
  243. app("OrderService")->update(["code"=>$map[$batch->code]],["batch_id"=>$batch->id]);
  244. }
  245. try{
  246. $this->batchService->assignTasks($batches); //在这里为波次注册任务!
  247. }catch (Exception $e){
  248. //避免影响外部事务:一般异常已在内部日志过
  249. }
  250. }
  251. }
  252. ValueStore::query()->where("name","wave_detail_last_sync_date")->update(["value"=>$details[count($details)-1]->edittime]);
  253. }
  254. /**
  255. * @param $wave
  256. * @return string
  257. */
  258. public function wms_status($wave): string
  259. {
  260. switch ($wave->wavestatus) {
  261. case 00:
  262. $wms_status = '创建';
  263. break;
  264. case 40:
  265. $wms_status = '部分收货';
  266. break;
  267. case 90:
  268. $wms_status = '取消';
  269. break;
  270. case 99:
  271. $wms_status = '完成';
  272. break;
  273. case 62:
  274. $wms_status = '部分装箱';
  275. break;
  276. default:
  277. $wms_status = (string)$wave->wavestatus;
  278. }
  279. return $wms_status;
  280. }
  281. }