MilesCranmer commited on
Commit
7b70a53
·
1 Parent(s): 29db367

Fix blocking of populations via RemoteChannel()

Browse files
Files changed (1) hide show
  1. julia/sr.jl +15 -5
julia/sr.jl CHANGED
@@ -748,6 +748,8 @@ function fullRun(niterations::Integer;
748
  )
749
  # 1. Start a population on every process
750
  allPops = Future[]
 
 
751
  bestSubPops = [Population(1) for j=1:npopulations]
752
  hallOfFame = HallOfFame()
753
 
@@ -758,19 +760,26 @@ function fullRun(niterations::Integer;
758
 
759
  # # 2. Start the cycle on every process:
760
  @sync for i=1:npopulations
761
- @async allPops[i] = @spawn run(fetch(allPops[i]), ncyclesperiteration, verbosity=verbosity)
762
  end
763
  println("Started!")
764
  cycles_complete = npopulations * niterations
765
 
766
  last_print_time = time()
767
  num_equations = 0.0
768
- print_every_n_seconds = 1
 
 
 
 
 
769
 
770
  while cycles_complete > 0
771
  for i=1:npopulations
772
- if isready(allPops[i])
773
- cur_pop = fetch(allPops[i])
 
 
774
  bestSubPops[i] = bestSubPop(cur_pop, topn=topn)
775
 
776
  #Try normal copy...
@@ -827,7 +836,7 @@ function fullRun(niterations::Integer;
827
  end
828
  end
829
 
830
- @async allPops[i] = @spawn let
831
  tmp_pop = run(cur_pop, ncyclesperiteration, verbosity=verbosity)
832
  for j=1:tmp_pop.n
833
  if rand() < 0.1
@@ -840,6 +849,7 @@ function fullRun(niterations::Integer;
840
  end
841
  tmp_pop
842
  end
 
843
 
844
  cycles_complete -= 1
845
  num_equations += ncyclesperiteration * npop / 10.0
 
748
  )
749
  # 1. Start a population on every process
750
  allPops = Future[]
751
+ # Set up a channel to send finished populations back to head node
752
+ channels = [RemoteChannel(1) for j=1:npopulations]
753
  bestSubPops = [Population(1) for j=1:npopulations]
754
  hallOfFame = HallOfFame()
755
 
 
760
 
761
  # # 2. Start the cycle on every process:
762
  @sync for i=1:npopulations
763
+ @async allPops[i] = @spawnat :any run(fetch(allPops[i]), ncyclesperiteration, verbosity=verbosity)
764
  end
765
  println("Started!")
766
  cycles_complete = npopulations * niterations
767
 
768
  last_print_time = time()
769
  num_equations = 0.0
770
+ print_every_n_seconds = 5
771
+
772
+ for i=1:npopulations
773
+ # Start listening for each population to finish:
774
+ @async put!(channels[i], fetch(allPops[i]))
775
+ end
776
 
777
  while cycles_complete > 0
778
  for i=1:npopulations
779
+ # Non-blocking check if a population is ready:
780
+ if isready(channels[i])
781
+ # Take the fetch operation from the channel since its ready
782
+ cur_pop = take!(channels[i])
783
  bestSubPops[i] = bestSubPop(cur_pop, topn=topn)
784
 
785
  #Try normal copy...
 
836
  end
837
  end
838
 
839
+ allPops[i] = @spawnat :any let
840
  tmp_pop = run(cur_pop, ncyclesperiteration, verbosity=verbosity)
841
  for j=1:tmp_pop.n
842
  if rand() < 0.1
 
849
  end
850
  tmp_pop
851
  end
852
+ @async put!(channels[i], fetch(allPops[i]))
853
 
854
  cycles_complete -= 1
855
  num_equations += ncyclesperiteration * npop / 10.0