SyncBatchTask.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Order;
  4. use App\OrderBin;
  5. use App\Services\BatchService;
  6. use App\Services\common\BatchUpdateService;
  7. use App\Services\DocWaveHeaderService;
  8. use App\Services\LogService;
  9. use App\ValueStore;
  10. use Carbon\Carbon;
  11. use Illuminate\Console\Command;
  12. use Illuminate\Support\Facades\DB;
  13. class SyncBatchTask extends Command
  14. {
  15. protected $signature = 'sync:batch';
  16. protected $description = 'sync wms batch task';
  17. /** @var DocWaveHeaderService $service */
  18. private $service;
  19. /** @var BatchService $batchService */
  20. private $batchService;
  21. public function __construct()
  22. {
  23. parent::__construct();
  24. $this->service = app(DocWaveHeaderService::class);
  25. $this->batchService = app(BatchService::class);
  26. }
  27. public function handle()
  28. {
  29. sleep(rand(0,10));
  30. $this->disposeHeader();
  31. $this->disposeDetail();
  32. }
  33. private function disposeHeader($date = null)
  34. {
  35. DB::transaction(function ()use($date){
  36. //获取更新时间与WMS数据
  37. if (!$date){
  38. $valueStore = ValueStore::query()->where("name","wave_last_sync_date")->lockForUpdate()->first();
  39. $date = $valueStore->value ?? Carbon::now()->subSeconds(65)->toDateTimeString();
  40. }
  41. $count = DB::connection("oracle")->selectOne(DB::raw("SELECT COUNT(*) count FROM DOC_WAVE_HEADER WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')"),[$date]);
  42. if ($count->count > 500){
  43. $query = DB::raw(<<<sql
  44. SELECT * FROM (SELECT header.*, ROWNUM AS rowno FROM (
  45. SELECT * FROM DOC_WAVE_HEADER WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')
  46. ORDER BY EDITTIME)header WHERE ROWNUM <= 500)wave WHERE wave.rowno >= 0
  47. sql
  48. );
  49. $waves = DB::connection("oracle")->select($query,[$date]);
  50. $this->headerExe($waves);
  51. $this->disposeHeader($waves[count($waves)-1]->edittime);
  52. }else{
  53. $waves = $this->service->get(["edittime"=>$date],["edittime"=>"gtOrEqual"]);
  54. $this->headerExe($waves);
  55. }
  56. });
  57. }
  58. private function headerExe($waves)
  59. {
  60. if (count($waves) < 1)return;
  61. //获取本地数据对比差异
  62. $codes = [];
  63. foreach ($waves as $wave){
  64. $codes[] = $wave->waveno;
  65. }
  66. $map = [];
  67. $batches = $this->batchService->get(["code"=>$codes]);
  68. if ($batches){
  69. foreach ($batches as $batch)$map[$batch->code] = $batch->id;
  70. }
  71. $update = [["id","wms_status","remark","updated_at"]];
  72. $insert = [];
  73. foreach ($waves as $wave){
  74. if (isset($map[$wave->waveno])){
  75. $update[] = [
  76. "id" => $map[$wave->waveno],
  77. "wms_status" => $this->wms_status($wave),
  78. "remark"=>$wave->descr,
  79. "updated_at"=>$wave->edittime,
  80. ];
  81. continue;
  82. }
  83. $owner = app("OwnerService")->codeGetOwner($wave->customerid);
  84. $insert[] = [
  85. "code" => $wave->waveno,
  86. "remark"=>$wave->descr,
  87. "status" => '未处理',
  88. "wms_status" => $this->wms_status($wave),
  89. "created_at"=>$wave->addtime,
  90. "updated_at"=>$wave->edittime,
  91. "owner_id"=>$owner->id,
  92. ];
  93. }
  94. //存在则更新
  95. if (count($update)>1){
  96. $bool = app(BatchUpdateService::class)->batchUpdate("batches",$update);
  97. if ($bool)LogService::log(__METHOD__,"SUCCESS-同步更新波次成功",json_encode($update));
  98. else LogService::log(__METHOD__,"ERROR-同步更新波次失败",json_encode($update));
  99. }
  100. //不存在则录入
  101. if ($insert){
  102. $this->batchService->insert($insert);
  103. LogService::log(__METHOD__,"SUCCESS-同步插入波次成功",json_encode($insert));
  104. }
  105. ValueStore::query()->where("name","wave_last_sync_date")->update(["value"=>$waves[count($waves)-1]->edittime]);
  106. }
  107. public function disposeDetail($date = null)
  108. {
  109. DB::transaction(function ()use($date){
  110. if (!$date){
  111. $valueStore = ValueStore::query()->where("name","wave_detail_last_sync_date")->lockForUpdate()->first();
  112. $date = $valueStore->value ?? Carbon::now()->subSeconds(65)->toDateTimeString();
  113. }
  114. $count = DB::connection("oracle")->selectOne(DB::raw("SELECT count(*) count FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')"),[$date]);
  115. if ($count->count > 1000){
  116. $sql = <<<sql
  117. SELECT * FROM (SELECT ORDERNO,WAVENO,SEQNO,EDITTIME, ROWNUM AS rowno FROM (
  118. SELECT * FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')
  119. ORDER BY EDITTIME) WHERE ROWNUM <= 1000)wave WHERE wave.rowno >= 0
  120. sql;
  121. $details = DB::connection("oracle")->select(DB::raw($sql),[$date]);
  122. $this->detailExe($details);
  123. $this->disposeDetail($details[count($details)-1]->edittime);
  124. }else{
  125. $sql = "SELECT ORDERNO,WAVENO,SEQNO,EDITTIME FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')";
  126. $details = DB::connection("oracle")->select(DB::raw($sql),[$date]);
  127. $this->detailExe($details);
  128. }
  129. });
  130. }
  131. private function detailExe($details)
  132. {
  133. if (count($details) < 1)return;
  134. $map = [];
  135. $nos = [];
  136. $orderCodes = [];
  137. $seqnos = [];
  138. foreach ($details as $detail){
  139. if (isset($map[$detail->waveno]))$map[$detail->waveno][] = $detail->orderno;
  140. else {
  141. $map[$detail->waveno] = [$detail->orderno];
  142. $nos[] = $detail->waveno;
  143. }
  144. $orderCodes[] = $detail->orderno;
  145. $seqnos[$detail->orderno] = $detail->seqno;
  146. }
  147. $orders = Order::query()->select("id","code")->whereIn("code",$orderCodes)->get();
  148. if (count($orderCodes) != count($orders))LogService::log(__METHOD__,"波次同步-本地订单缺失",json_encode($orderCodes)." | ".count($orderCodes)." | ".count($orders));
  149. if ($orders){
  150. $orderIds = [];
  151. $orderMap = [];
  152. foreach ($orders as $order){
  153. $orderIds[] = $order->id;
  154. $orderMap[$order->id] = $seqnos[$order->code];
  155. }
  156. $updateBin = [["id","number"]];
  157. $insertBin = [];
  158. $deleteBin = [];
  159. $orderBins = OrderBin::query()->select("id","order_id","number")->whereIn("order_id",$orderIds)->get();
  160. foreach ($orderBins as $orderBin){
  161. if (!isset($orderMap[$orderBin->order_id])){
  162. $deleteBin[] = $orderBin->id;
  163. continue;
  164. }
  165. if ($orderBin->number != $orderMap[$orderBin->order_id])$updateBin[]=["id"=>$orderBin->id,"number"=>$orderMap[$orderBin->order_id]];
  166. unset($orderMap[$orderBin->order_id]);
  167. }
  168. foreach ($orderMap as $orderId=>$binNumber){
  169. $insertBin[]=[
  170. "order_id"=>$orderId,
  171. "number"=>$binNumber
  172. ];
  173. }
  174. if (count($updateBin)>1){
  175. app(BatchUpdateService::class)->batchUpdate("order_bins",$updateBin);
  176. LogService::log(__METHOD__,"波次同步-更新订单格口号",json_encode($updateBin));
  177. }
  178. if ($insertBin){
  179. OrderBin::query()->insert($insertBin);
  180. LogService::log(__METHOD__,"波次同步-录入订单格口号",json_encode($insertBin));
  181. }
  182. if ($deleteBin){
  183. OrderBin::destroy($deleteBin);
  184. LogService::log(__METHOD__,"波次同步-删除订单格口号",json_encode($deleteBin));
  185. }
  186. }
  187. $batches = $this->batchService->get(["code"=>$nos]);
  188. foreach ($batches as $batch){
  189. app("OrderService")->update(["code"=>$map[$batch->code]],["batch_id"=>$batch->id]);
  190. unset($map[$batch->code]);
  191. }
  192. if ($map){
  193. $waveCodes = array_keys($map);
  194. $waves = $this->service->get(["waveno"=>$waveCodes],["waveno"=>"in"]);
  195. $insert = [];
  196. foreach ($waves as $wave){
  197. $owner = app("OwnerService")->codeGetOwner($wave->customerid);
  198. $insert[] = [
  199. "code" => $wave->waveno,
  200. "status" => '未处理',
  201. "wms_status" => $this->wms_status($wave),
  202. "remark"=>$wave->descr,
  203. "created_at"=>$wave->addtime,
  204. "updated_at"=>$wave->edittime,
  205. "owner_id"=>$owner->id,
  206. ];
  207. }
  208. if ($insert){
  209. $this->batchService->insert($insert);
  210. LogService::log(__METHOD__,"SUCCESS-同步插入波次成功",json_encode($insert));
  211. $batches = $this->batchService->get(["code"=>$waveCodes]);
  212. foreach ($batches as $batch){
  213. app("OrderService")->update(["code"=>$map[$batch->code]],["batch_id"=>$batch->id]);
  214. }
  215. }
  216. }
  217. // $this->batchService->assignTasks($batches);
  218. ValueStore::query()->where("name","wave_detail_last_sync_date")->update(["value"=>$details[count($details)-1]->edittime]);
  219. }
  220. /**
  221. * @param $wave
  222. * @return string
  223. */
  224. public function wms_status($wave): string
  225. {
  226. switch ($wave->wavestatus) {
  227. case 00:
  228. $wms_status = '创建';
  229. break;
  230. case 40:
  231. $wms_status = '部分收货';
  232. break;
  233. case 90:
  234. $wms_status = '取消';
  235. break;
  236. case 99:
  237. $wms_status = '完成';
  238. break;
  239. case 62:
  240. $wms_status = '部分装箱';
  241. break;
  242. default:
  243. $wms_status = (string)$wave->wavestatus;
  244. }
  245. return $wms_status;
  246. }
  247. }