CP2K 2.4 (Revision 12889)

realspace_grid_types.f90

Go to the documentation of this file.
00001 !-----------------------------------------------------------------------------!
00002 !   CP2K: A general program to perform molecular dynamics simulations         !
00003 !   Copyright (C) 2000 - 2013  CP2K developers group                          !
00004 !-----------------------------------------------------------------------------!
00005 
00006 ! *****************************************************************************
00016 MODULE realspace_grid_types
00017   USE cp_array_r_utils,                ONLY: cp_1d_r_p_type
00018   USE input_constants,                 ONLY: rsgrid_automatic,&
00019                                              rsgrid_replicated
00020   USE input_section_types,             ONLY: section_vals_get,&
00021                                              section_vals_type,&
00022                                              section_vals_val_get
00023   USE kahan_sum,                       ONLY: accurate_sum
00024   USE kinds,                           ONLY: dp, int_8
00025   USE mathlib,                         ONLY: det_3x3
00026   USE message_passing,                 ONLY: &
00027        mp_comm_dup, mp_comm_free, mp_environ, mp_irecv, mp_isend, &
00028        mp_isendrecv, mp_max, mp_min, mp_request_null, mp_sum, mp_sync, &
00029        mp_waitall, mp_waitany
00030   USE pw_grid_types,                   ONLY: PW_MODE_LOCAL,&
00031                                              pw_grid_type
00032   USE pw_grids,                        ONLY: pw_grid_release,&
00033                                              pw_grid_retain
00034   USE pw_methods,                      ONLY: pw_integrate_function
00035   USE pw_types,                        ONLY: COMPLEXDATA3D,&
00036                                              REALDATA3D,&
00037                                              pw_type
00038   USE termination,                     ONLY: stop_program
00039   USE timings,                         ONLY: timeset,&
00040                                              timestop
00041   USE util,                            ONLY: get_limit
00042 #include "cp_common_uses.h"
00043 
00044 #if defined(__HAS_NO_OMP_3)
00045 #define __COLLAPSE3
00046 #else
00047 #define __COLLAPSE3 collapse(3)
00048 #endif
00049 
00050   IMPLICIT NONE
00051 
00052   PRIVATE
00053   PUBLIC :: realspace_grid_type,&
00054             realspace_grid_desc_type,&
00055             realspace_grid_p_type,&
00056             realspace_grid_desc_p_type,&
00057             realspace_grid_input_type
00058 
00059   PUBLIC :: rs_pw_transfer,&
00060             rs_grid_zero,&
00061             rs_grid_set_box,&
00062             rs_grid_create,&
00063             rs_grid_create_descriptor,&
00064             rs_grid_retain,&
00065             rs_grid_retain_descriptor,&
00066             rs_grid_release,&
00067             rs_grid_release_descriptor,&
00068             rs_grid_reorder_ranks, &
00069             rs_grid_print, &
00070             rs_grid_locate_rank, &
00071             rs_grid_max_ngpts, &
00072             rs_grid_mult_and_add,&
00073             init_input_type
00074   LOGICAL, PRIVATE, PARAMETER :: debug_this_module=.FALSE.
00075   CHARACTER(len=*), PARAMETER, PRIVATE :: moduleN = 'realspace_grid_types'
00076   INTEGER, SAVE, PRIVATE :: last_rs_id=0
00077   INTEGER, SAVE, PRIVATE :: allocated_rs_grid_count=0
00078   INTEGER, PARAMETER, PUBLIC :: rs2pw=11,pw2rs=12
00079 
00080 ! *****************************************************************************
00081   TYPE realspace_grid_input_type
00082      INTEGER       :: distribution_type
00083      INTEGER       :: distribution_layout(3)
00084      REAL(KIND=dp) :: memory_factor
00085      LOGICAL       :: lock_distribution
00086      INTEGER       :: nsmax
00087      REAL(KIND=dp) :: halo_reduction_factor
00088   END TYPE realspace_grid_input_type
00089 
00090 ! *****************************************************************************
00091   TYPE realspace_grid_desc_type
00092      INTEGER :: grid_id                                  ! tag of the pw_grid
00093 
00094      TYPE(pw_grid_type), POINTER   :: pw                 ! the pw grid
00095 
00096      INTEGER :: ref_count                                ! reference count
00097 
00098      INTEGER(int_8) :: ngpts                             ! # grid points
00099      INTEGER, DIMENSION (3) :: npts                      ! # grid points per dimension
00100      INTEGER, DIMENSION (3) :: lb                        ! lower bounds
00101      INTEGER, DIMENSION (3) :: ub                        ! upper bounds
00102 
00103      INTEGER :: border                                   ! border points
00104 
00105      INTEGER, DIMENSION (3) :: perd                      ! periodicity enforced
00106      REAL(KIND=dp), DIMENSION(3,3) :: dh                 ! incremental grid matrix
00107      REAL(KIND=dp), DIMENSION(3,3) :: dh_inv             ! inverse incremental grid matrix
00108      LOGICAL :: orthorhombic                             ! grid symmetry
00109 
00110      LOGICAL :: parallel                                 ! whether the corresponding pw grid is distributed
00111      LOGICAL :: distributed                              ! whether the rs grid is distributed
00112      ! these MPI related quantities are only meaningful depending on how the grid has been layed out
00113      ! they are most useful for fully distributed grids, where they reflect the topology of the grid
00114      INTEGER :: group
00115      INTEGER :: my_pos
00116      LOGICAL :: group_head
00117      INTEGER :: group_size
00118      INTEGER, DIMENSION (3) :: group_dim
00119      INTEGER, DIMENSION (3) :: group_coor
00120      INTEGER, DIMENSION (3) :: neighbours
00121      ! only meaningful on distributed grids
00122      ! a list of bounds for each CPU
00123      INTEGER, DIMENSION (:,:), POINTER :: lb_global
00124      INTEGER, DIMENSION (:,:), POINTER :: ub_global
00125      ! a mapping from linear rank to 3d coord
00126      INTEGER, DIMENSION (:,:), POINTER :: rank2coord
00127      INTEGER, DIMENSION (:,:,:), POINTER :: coord2rank
00128      ! a mapping from index to rank (which allows to figure out easily on which rank a given point of the grid is)
00129      INTEGER, DIMENSION (:), POINTER :: x2coord
00130      INTEGER, DIMENSION (:), POINTER :: y2coord
00131      INTEGER, DIMENSION (:), POINTER :: z2coord
00132 
00133      INTEGER                :: my_virtual_pos
00134      INTEGER, DIMENSION (3) :: virtual_group_coor
00135 
00136      INTEGER, DIMENSION(:), ALLOCATABLE :: virtual2real, real2virtual
00137 
00138   END TYPE realspace_grid_desc_type
00139 
00140   TYPE realspace_grid_type
00141 
00142      TYPE(realspace_grid_desc_type), POINTER :: desc
00143 
00144      INTEGER :: id_nr                                    ! unique identifier of rs
00145      INTEGER :: ref_count                                ! reference count
00146 
00147      INTEGER :: ngpts_local                              ! local dimensions
00148      INTEGER, DIMENSION (3) :: npts_local
00149      INTEGER, DIMENSION (3) :: lb_local
00150      INTEGER, DIMENSION (3) :: ub_local
00151      INTEGER, DIMENSION (3) :: lb_real                   ! lower bounds of the real local data
00152      INTEGER, DIMENSION (3) :: ub_real                   ! upper bounds of the real local data
00153 
00154      INTEGER, DIMENSION (:), POINTER :: px,py,pz         ! index translators
00155      REAL(KIND=dp), DIMENSION ( :, :, : ), POINTER :: r   ! the grid
00156 
00157   END TYPE realspace_grid_type
00158 
00159 ! *****************************************************************************
00160   TYPE realspace_grid_p_type
00161      TYPE(realspace_grid_type), POINTER :: rs_grid
00162   END TYPE realspace_grid_p_type
00163 
00164   TYPE realspace_grid_desc_p_type
00165      TYPE(realspace_grid_desc_type), POINTER :: rs_desc
00166   END TYPE realspace_grid_desc_p_type
00167 
00168 CONTAINS
00169 
00170 ! *****************************************************************************
00179   SUBROUTINE init_input_type(input_settings,nsmax,rs_grid_section,ilevel,higher_grid_layout,error)
00180     TYPE(realspace_grid_input_type), 
00181       INTENT(OUT)                            :: input_settings
00182     INTEGER, INTENT(IN)                      :: nsmax
00183     TYPE(section_vals_type), OPTIONAL, 
00184       POINTER                                :: rs_grid_section
00185     INTEGER, INTENT(IN)                      :: ilevel
00186     INTEGER, DIMENSION(3), INTENT(IN)        :: higher_grid_layout
00187     TYPE(cp_error_type), INTENT(inout)       :: error
00188 
00189     INTEGER                                  :: isection, 
00190                                                 max_distributed_level, 
00191                                                 nsection
00192     INTEGER, DIMENSION(:), POINTER           :: tmp
00193 
00194     IF (PRESENT(rs_grid_section)) THEN
00195        input_settings%nsmax=nsmax
00196        ! we use the section corresponding to the level, or the largest available one
00197        ! i.e. the last section defines all following ones
00198        CALL section_vals_get(rs_grid_section,n_repetition=nsection,error=error)
00199        isection=MAX(1,MIN(ilevel,nsection))
00200        CALL section_vals_val_get(rs_grid_section,"DISTRIBUTION_TYPE",&
00201             i_rep_section=isection,&
00202             i_val=input_settings%distribution_type,error=error)
00203        CALL section_vals_val_get(rs_grid_section,"DISTRIBUTION_LAYOUT",&
00204             i_rep_section=isection,&
00205             i_vals=tmp,error=error)
00206        input_settings%distribution_layout=tmp
00207        CALL section_vals_val_get(rs_grid_section,"MEMORY_FACTOR",&
00208             i_rep_section=isection,&
00209             r_val=input_settings%memory_factor,error=error)
00210        CALL section_vals_val_get(rs_grid_section,"HALO_REDUCTION_FACTOR",&
00211             i_rep_section=isection,&
00212             r_val=input_settings%halo_reduction_factor,error=error)
00213        CALL section_vals_val_get(rs_grid_section,"LOCK_DISTRIBUTION",&
00214             i_rep_section=isection,&
00215             l_val=input_settings%lock_distribution,error=error)
00216        CALL section_vals_val_get(rs_grid_section,"MAX_DISTRIBUTED_LEVEL",&
00217             i_rep_section=isection,&
00218             i_val=max_distributed_level,error=error)
00219 
00220        ! multigrids that are to coarse are not distributed in the automatic scheme
00221        IF (input_settings%distribution_type == rsgrid_automatic) THEN
00222           IF (ilevel>max_distributed_level) THEN
00223              input_settings%distribution_type=rsgrid_replicated
00224           ENDIF
00225        ENDIF
00226     ELSE
00227        input_settings%nsmax=-1
00228        input_settings%distribution_type=rsgrid_replicated
00229        input_settings%lock_distribution=.FALSE.
00230        input_settings%halo_reduction_factor=1.0_dp
00231     ENDIF
00232     IF (input_settings%lock_distribution) THEN
00233        IF (ALL(higher_grid_layout>0)) input_settings%distribution_layout=higher_grid_layout
00234     ENDIF
00235   END SUBROUTINE init_input_type
00236 
00237 ! *****************************************************************************
00241   FUNCTION rs_grid_locate_rank(rs_desc,rank_in,shift) RESULT(rank_out)
00242     TYPE(realspace_grid_desc_type), POINTER  :: rs_desc
00243     INTEGER, INTENT(IN)                      :: rank_in
00244     INTEGER, DIMENSION(3), INTENT(IN)        :: shift
00245     INTEGER                                  :: rank_out
00246 
00247     INTEGER                                  :: coord(3)
00248 
00249     coord=MODULO(rs_desc%rank2coord(:,rank_in)+shift,rs_desc%group_dim)
00250     rank_out=rs_desc%coord2rank(coord(1),coord(2),coord(3))
00251   END FUNCTION rs_grid_locate_rank
00252 
00253 ! *****************************************************************************
00261   SUBROUTINE rs_grid_create_descriptor ( desc, pw_grid, input_settings, error)
00262     TYPE(realspace_grid_desc_type), POINTER  :: desc
00263     TYPE(pw_grid_type), POINTER              :: pw_grid
00264     TYPE(realspace_grid_input_type), 
00265       INTENT(IN)                             :: input_settings
00266     TYPE(cp_error_type), INTENT(inout)       :: error
00267 
00268     CHARACTER(LEN=*), PARAMETER :: routineN = 'rs_grid_create_descriptor', 
00269       routineP = moduleN//':'//routineN
00270 
00271     INTEGER                                  :: dir, handle, i, j, k, l, 
00272                                                 lb( 2 ), n_slices(3), 
00273                                                 n_slices_tmp(3), 
00274                                                 neighbours(3), nmin, stat
00275     LOGICAL                                  :: failure, overlap
00276     REAL(KIND=dp)                            :: ratio, ratio_best, volume, 
00277                                                 volume_dist
00278 
00279     CALL timeset(routineN,handle)
00280 
00281     failure = .FALSE.
00282     CPPrecondition(ASSOCIATED(pw_grid),cp_failure_level,routineP,error,failure)
00283 
00284     ALLOCATE(desc,stat=stat)
00285     CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00286 
00287     CALL mp_sync(pw_grid % para % group)
00288 
00289     NULLIFY(desc % rank2coord,desc % coord2rank,desc % lb_global,desc % ub_global,desc % x2coord,desc % y2coord,desc % z2coord)
00290 
00291     desc % pw => pw_grid
00292     CALL pw_grid_retain(desc%pw,error=error)
00293 
00294     desc % dh = pw_grid%dh
00295     desc % dh_inv = pw_grid%dh_inv
00296     desc % orthorhombic = pw_grid%orthorhombic
00297     desc % grid_id = pw_grid % id_nr
00298     desc % ref_count = 1
00299 
00300     IF ( pw_grid % para % mode == PW_MODE_LOCAL ) THEN
00301        ! The corresponding group has dimension 1
00302        ! All operations will be done localy
00303        desc % npts = pw_grid % npts
00304        desc % ngpts = PRODUCT ( INT(desc % npts,KIND=int_8) )
00305        desc % lb = pw_grid % bounds ( 1, : )
00306        desc % ub = pw_grid % bounds ( 2, : )
00307        desc % perd = 1
00308        desc % border = 0
00309        desc % parallel = .FALSE.
00310        desc % distributed = .FALSE.
00311        desc % group = -1
00312        desc % group_size = 1
00313        desc % group_head = .TRUE.
00314        desc % group_dim = 1
00315        desc % group_coor = 0
00316        desc % my_pos = 0
00317     ELSE
00318        ! group size of desc grid
00319        ! global grid dimensions are still the same
00320        desc % group_size = pw_grid % para % group_size
00321        desc % npts = pw_grid % npts
00322        desc % ngpts = PRODUCT ( INT(desc % npts, KIND=int_8) )
00323        desc % lb = pw_grid % bounds ( 1, : )
00324        desc % ub = pw_grid % bounds ( 2, : )
00325 
00326        ! this is the eventual border size
00327        nmin = ( input_settings % nsmax + 1 ) / 2
00328        nmin = MAX(0,NINT(nmin* input_settings % halo_reduction_factor))
00329 
00330        IF ( input_settings % distribution_type == rsgrid_replicated ) THEN
00331 
00332           n_slices = 1
00333 
00334        ELSE
00335           n_slices = 1
00336           ratio_best=-HUGE(ratio_best)
00337 
00338           ! don't allow distributions with more processodesc than real grid points
00339           DO k=1, MIN(desc % npts (3), desc % group_size)
00340           DO j=1, MIN(desc % npts (2), desc % group_size)
00341              i= MIN (desc % npts (1), desc % group_size/(j*k) )
00342              n_slices_tmp=(/i,j,k/)
00343 
00344              ! we don't match the actual number of CPUs
00345              IF (PRODUCT(n_slices_tmp) .NE. desc % group_size) CYCLE
00346 
00347              ! we see if there has been a input constraint
00348              ! i.e. if the layout is not -1 we need to fullfil it
00349              IF (.NOT. ALL(PACK(n_slices_tmp==input_settings % distribution_layout,&
00350                                 (/-1,-1,-1/)/=input_settings % distribution_layout )&
00351                           )) CYCLE
00352 
00353              ! we currently can not work with a grid that has bordedesc that overlaps with the local data of the grid itself
00354              overlap=.FALSE.
00355              DO dir=1,3
00356                 IF (n_slices_tmp(dir)>1) THEN
00357                    neighbours(dir) = HUGE(0)
00358                    DO l=0,n_slices_tmp(dir)-1
00359                       lb = get_limit ( desc % npts ( dir ),  n_slices_tmp( dir ), l )
00360                       neighbours(dir) = MIN(lb (2) -lb (1) + 1, neighbours(dir) )
00361                    ENDDO
00362                    desc % neighbours(dir) =  (nmin + neighbours(dir) - 1)/neighbours(dir)
00363                    IF( desc % neighbours(dir) .GE. n_slices_tmp (dir) ) overlap=.TRUE.
00364                 ELSE
00365                    neighbours(dir) = 0
00366                 ENDIF
00367              ENDDO
00368              ! XXXXXXX I think the overlap criterium / neighbour calculation is too conservative
00369              ! write(6,*) n_slices_tmp,desc % neighbours, overlap
00370              IF (overlap) CYCLE
00371 
00372              ! a heuristic optimisation to reduce the memory usage
00373              ! we go for the smallest local to real volume
00374              ! volume of the box without the wings / volume of the box with the wings
00375              ! with prefactodesc to promote less cuts in Z dimension
00376              ratio = PRODUCT(REAL(desc % npts,KIND=dp) / n_slices_tmp ) /  
00377                      PRODUCT(REAL(desc % npts,KIND=dp) / n_slices_tmp  +   
00378                         MERGE((/0.0,0.0,0.0/),2*(/1.06*nmin,1.05*nmin,1.03*nmin/),n_slices_tmp==(/1,1,1/)))
00379              IF (ratio>ratio_best) THEN
00380                 ratio_best=ratio
00381                 n_slices=n_slices_tmp
00382              ENDIF
00383 
00384           ENDDO
00385           ENDDO
00386 
00387           ! if automatic we can still decide this is a replicated grid
00388           ! if the memory gain (or the gain is messages) is too small.
00389           IF (input_settings % distribution_type == rsgrid_automatic) THEN
00390              volume=PRODUCT(REAL(desc % npts,KIND=dp))
00391              volume_dist=PRODUCT(REAL(desc % npts,KIND=dp) / n_slices  +   
00392                           MERGE((/0,0,0/),2*(/nmin,nmin,nmin/),n_slices==(/1,1,1/)))
00393              IF (volume<volume_dist*input_settings%memory_factor) THEN
00394                  n_slices=1
00395              ENDIF
00396           ENDIF
00397 
00398        END IF
00399 
00400        desc % group_dim (:) = n_slices(:)
00401        CALL mp_comm_dup( pw_grid % para % group , desc % group )
00402        CALL mp_environ(desc % group_size, desc % my_pos, desc % group )
00403 
00404        IF ( ALL (n_slices == 1 ) ) THEN
00405           ! CASE 1 : only one slice: we do not need overlapping regions and special
00406           !          recombination of the total density
00407           desc % perd = 1
00408           desc % border = 0
00409           desc % parallel = .TRUE.
00410           desc % distributed = .FALSE.
00411           desc % group_head = pw_grid % para % group_head
00412           desc % group_coor ( : ) = 0
00413           desc % my_virtual_pos = 0
00414 
00415           ALLOCATE ( desc % virtual2real ( 0 : desc % group_size - 1) )
00416           ALLOCATE ( desc % real2virtual ( 0 : desc % group_size - 1) )
00417           ! Start with no reordering
00418           DO i=0, desc % group_size - 1
00419             desc % virtual2real(i) = i
00420             desc % real2virtual(i) = i
00421           END DO
00422 
00423        ELSE
00424           ! CASE 2 : general case
00425           ! periodicity is no longer enforced arbritary  directions
00426           desc % perd = 1
00427           DO dir = 1,3
00428              IF (n_slices(dir).GT.1) THEN
00429                 desc % perd ( dir ) = 0
00430              ENDIF
00431           ENDDO
00432           ! we keep a border of nmin points
00433           desc % border = nmin
00434           ! we are going parallel on the real space grid
00435           desc % parallel = .TRUE.
00436           desc % distributed = .TRUE.
00437           desc % group_head = ( desc % my_pos == 0 )
00438 
00439           ! set up global info about the distribution
00440           ALLOCATE( desc % rank2coord(3,0:desc % group_size-1),STAT=stat)
00441           CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00442           ALLOCATE( desc % coord2rank(0:desc % group_dim(1)-1,0:desc % group_dim(2)-1,0:desc % group_dim(3)-1),STAT=stat)
00443           CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00444           ALLOCATE( desc % lb_global(3,0:desc % group_size-1),STAT=stat)
00445           CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00446           ALLOCATE( desc % ub_global(3,0:desc % group_size-1),STAT=stat)
00447           CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00448           ALLOCATE( desc % x2coord(desc % lb(1):desc % ub(1)),STAT=stat)
00449           CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00450           ALLOCATE( desc % y2coord(desc % lb(2):desc % ub(2)),STAT=stat)
00451           CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00452           ALLOCATE( desc % z2coord(desc % lb(3):desc % ub(3)),STAT=stat)
00453           CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00454 
00455           DO i=0, desc% group_size - 1
00456              ! Calculate coordinates in a row-major order (to be SMP-friendly)
00457              desc % rank2coord(1,i) = i / ( desc % group_dim(2) * desc % group_dim(3) )
00458              desc % rank2coord(2,i) = MODULO (i, desc % group_dim(2) * desc % group_dim(3) ) &
00459                                       / desc % group_dim(3)
00460              desc % rank2coord(3,i) = MODULO (i, desc % group_dim(3))
00461 
00462              IF (i == desc% my_pos ) THEN
00463                desc % group_coor = desc % rank2coord(:,i)
00464              END IF
00465 
00466              desc % coord2rank(desc % rank2coord(1,i),desc % rank2coord(2,i),desc % rank2coord(3,i))=i
00467              ! the lb_global and ub_global correspond to lb_real and ub_real of each task
00468              desc % lb_global(:,i) = desc % lb
00469              desc % ub_global(:,i) = desc % ub
00470              DO dir = 1,3
00471                 IF (desc % group_dim(dir).GT.1) THEN
00472                    lb = get_limit ( desc % npts ( dir ), desc % group_dim ( dir ), desc % rank2coord ( dir, i ) )
00473                    desc % lb_global ( dir, i ) = lb ( 1 ) + desc % lb ( dir ) - 1
00474                    desc % ub_global ( dir, i ) = lb ( 2 ) + desc % lb ( dir ) - 1
00475                 ENDIF
00476              ENDDO
00477           ENDDO
00478 
00479           ! map a grid point to a CPU coord
00480           DO dir=1,3
00481              DO l=0,desc % group_dim ( dir )-1
00482                IF (desc % group_dim(dir).GT.1) THEN
00483                    lb = get_limit ( desc % npts ( dir ), desc % group_dim ( dir ), l )
00484                    lb = lb + desc % lb ( dir ) - 1
00485                ELSE
00486                    lb(1) = desc % lb ( dir )
00487                    lb(2) = desc % ub ( dir )
00488                ENDIF
00489                SELECT CASE(dir)
00490                CASE(1)
00491                  desc % x2coord(lb(1):lb(2))=l
00492                CASE(2)
00493                  desc % y2coord(lb(1):lb(2))=l
00494                CASE(3)
00495                  desc % z2coord(lb(1):lb(2))=l
00496                END SELECT
00497              ENDDO
00498           ENDDO
00499 
00500           ! an upper bound for the number of neighbours the border is overlapping with
00501           DO dir=1,3
00502              desc % neighbours(dir) = 0
00503              IF (n_slices(dir).GT.1) THEN
00504                 neighbours(dir) = HUGE(0)
00505                 DO l=0,n_slices(dir)-1
00506                    lb = get_limit ( desc % npts ( dir ),  n_slices( dir ), l )
00507                    neighbours(dir) = MIN(lb (2) -lb (1) + 1, neighbours(dir) )
00508                 ENDDO
00509                 desc % neighbours(dir) =  (desc % border + neighbours(dir) - 1)/neighbours(dir)
00510              ENDIF
00511           ENDDO
00512 
00513           ALLOCATE ( desc % virtual2real ( 0 : desc % group_size - 1) )
00514           ALLOCATE ( desc % real2virtual ( 0 : desc % group_size - 1) )
00515           ! Start with no reordering
00516           DO i=0, desc % group_size - 1
00517             desc % virtual2real(i) = i
00518             desc % real2virtual(i) = i
00519           END DO
00520 
00521           desc % my_virtual_pos = desc % real2virtual(desc % my_pos)
00522           desc % virtual_group_coor( : ) = desc % rank2coord(:, desc % my_virtual_pos)
00523 
00524        END IF
00525     END IF
00526 
00527     CALL timestop(handle)
00528 
00529   END SUBROUTINE rs_grid_create_descriptor
00530 
00531   SUBROUTINE rs_grid_create ( rs, desc, error)
00532     TYPE(realspace_grid_type), POINTER       :: rs
00533     TYPE(realspace_grid_desc_type), POINTER  :: desc
00534     TYPE(cp_error_type), INTENT(inout)       :: error
00535 
00536     CHARACTER(LEN=*), PARAMETER :: routineN = 'rs_grid_create', 
00537       routineP = moduleN//':'//routineN
00538 
00539     INTEGER                                  :: handle, stat
00540     LOGICAL                                  :: failure
00541 
00542     CALL timeset(routineN,handle)
00543 
00544     ALLOCATE(rs,stat=stat)
00545     CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00546 
00547     last_rs_id = last_rs_id+1
00548     rs % id_nr = last_rs_id
00549     rs % ref_count = 1
00550     rs % desc => desc
00551     CALL rs_grid_retain_descriptor(rs % desc, error)
00552 
00553     IF ( desc % pw % para % mode == PW_MODE_LOCAL ) THEN
00554        ! The corresponding group has dimension 1
00555        ! All operations will be done localy
00556        rs % npts_local = desc % npts
00557        rs % ngpts_local = PRODUCT ( desc % npts )
00558        rs % lb_local = desc % pw % bounds ( 1, : )
00559        rs % ub_local = desc % pw % bounds ( 2, : )
00560        rs % lb_real = desc % pw % bounds ( 1, : )
00561        rs % ub_real = desc % pw % bounds ( 2, : )
00562     END IF
00563 
00564     IF ( ALL (rs % desc % group_dim == 1 ) ) THEN
00565           ! CASE 1 : only one slice: we do not need overlapping regions and special
00566           !          recombination of the total density
00567           rs % npts_local = desc % pw % npts
00568           rs % ngpts_local = PRODUCT ( desc % npts )
00569           rs % lb_local = desc % lb
00570           rs % ub_local = desc % ub
00571           rs % lb_real = desc % lb
00572           rs % ub_real = desc % ub
00573     ELSE
00574           ! CASE 2 : general case
00575           ! extract some more derived quantities about the local grid
00576           rs % lb_real = desc % lb_global ( :, desc % my_virtual_pos )
00577           rs % ub_real = desc % ub_global ( :, desc % my_virtual_pos )
00578           rs % lb_local = rs % lb_real - desc % border * ( 1 - desc % perd )
00579           rs % ub_local = rs % ub_real + desc % border * ( 1 - desc % perd )
00580           rs % npts_local = rs % ub_local - rs % lb_local +1
00581           rs % ngpts_local = PRODUCT ( rs % npts_local )
00582     END IF
00583 
00584     allocated_rs_grid_count = allocated_rs_grid_count + 1
00585 
00586     ALLOCATE ( rs % r (rs % lb_local(1):rs % ub_local(1), &
00587                        rs % lb_local(2):rs % ub_local(2), &
00588                        rs % lb_local(3):rs % ub_local(3)), STAT = stat )
00589     CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00590     ALLOCATE ( rs % px ( desc % npts ( 1 ) ), STAT = stat )
00591     CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00592     ALLOCATE ( rs % py ( desc % npts ( 2 ) ), STAT = stat )
00593     CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00594     ALLOCATE ( rs % pz ( desc % npts ( 3 ) ), STAT = stat )
00595     CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00596 
00597     CALL timestop(handle)
00598 
00599   END SUBROUTINE rs_grid_create
00600 
00601 ! *****************************************************************************
00615   SUBROUTINE rs_grid_reorder_ranks(desc, real2virtual, error)
00616 
00617     TYPE(realspace_grid_desc_type), POINTER  :: desc
00618     INTEGER, DIMENSION(:), INTENT(IN)        :: real2virtual
00619     TYPE(cp_error_type), INTENT(inout)       :: error
00620 
00621     CHARACTER(LEN=*), PARAMETER :: routineN = 'rs_grid_reorder_ranks', 
00622       routineP = moduleN//':'//routineN
00623 
00624     INTEGER                                  :: handle, i
00625 
00626     CALL timeset(routineN,handle)
00627 
00628     desc%real2virtual = real2virtual
00629 
00630     DO i=0, desc % group_size - 1
00631       desc % virtual2real(desc % real2virtual(i)) = i
00632     END DO
00633 
00634     desc % my_virtual_pos = desc % real2virtual(desc%my_pos)
00635 
00636     IF ( .NOT. ALL (desc % group_dim == 1 ) ) THEN
00637           desc % virtual_group_coor( : ) = desc % rank2coord(:, desc % my_virtual_pos)
00638     END IF
00639 
00640     CALL timestop(handle)
00641 
00642   END SUBROUTINE
00643 
00644 ! *****************************************************************************
00648   SUBROUTINE rs_grid_print ( rs, iounit, error)
00649     TYPE(realspace_grid_type), POINTER       :: rs
00650     INTEGER, INTENT(in)                      :: iounit
00651     TYPE(cp_error_type), INTENT(inout)       :: error
00652 
00653     CHARACTER(LEN=*), PARAMETER :: routineN = 'rs_grid_print', 
00654       routineP = moduleN//':'//routineN
00655 
00656     INTEGER                                  :: dir, i, nn
00657     LOGICAL                                  :: failure
00658     REAL(KIND=dp)                            :: pp( 3 )
00659 
00660     failure = .FALSE.
00661 
00662     IF ( rs % desc % parallel ) THEN
00663        IF ( iounit>0) THEN
00664           WRITE ( iounit, '(/,A,T71,I10)' ) &
00665                " RS_GRID: Information for grid number ", rs % desc % grid_id
00666           DO i = 1, 3
00667              WRITE ( iounit, '(A,I3,T30,2I8,T62,A,T71,I10)' ) " RS_GRID:   Bounds ", &
00668                   i, rs % desc % lb ( i ), rs % desc % ub ( i ), "Points:", rs % desc % npts ( i )
00669           END DO
00670           IF ( .NOT. rs % desc % distributed ) THEN
00671              WRITE ( iounit, '(A)' ) " RS_GRID: Real space fully replicated"
00672              WRITE ( iounit, '(A,T71,I10)' ) &
00673                   " RS_GRID: Group size ", rs % desc % group_dim (2)
00674           ELSE
00675              DO dir = 1,3
00676                 IF ( rs % desc % perd (dir) /= 1 ) THEN
00677                    WRITE ( iounit, '(A,T71,I3,A)' ) &
00678                         " RS_GRID: Real space distribution over ", rs % desc % group_dim ( dir ), " groups"
00679                    WRITE ( iounit, '(A,T71,I10)' ) &
00680                         " RS_GRID: Real space distribution along direction ", dir
00681                    WRITE ( iounit, '(A,T71,I10)' ) &
00682                         " RS_GRID: Border size ", rs % desc % border
00683                 END IF
00684              END DO
00685           END IF
00686        END IF
00687        IF ( rs % desc %distributed ) THEN
00688           DO dir = 1 ,3
00689              IF ( rs % desc % perd (dir) /= 1 ) THEN
00690                 nn = rs % npts_local ( dir )
00691                 CALL mp_sum ( nn, rs % desc % group )
00692                 pp ( 1 ) = REAL ( nn, KIND=dp ) / REAL ( PRODUCT ( rs % desc % group_dim ), KIND=dp )
00693                 nn = rs % npts_local ( dir )
00694                 CALL mp_max ( nn, rs % desc % group )
00695                 pp ( 2 ) = REAL ( nn, KIND=dp )
00696                 nn = rs % npts_local ( dir )
00697                 CALL mp_min ( nn, rs % desc % group )
00698                 pp ( 3 ) = REAL ( nn, KIND=dp )
00699                 IF ( iounit > 0) THEN
00700                    WRITE ( iounit, '(A,T48,A)' ) " RS_GRID:   Distribution", &
00701                         "  Average         Max         Min"
00702                    WRITE ( iounit, '(A,T45,F12.1,2I12)' ) " RS_GRID:   Planes   ", &
00703                         pp ( 1 ), NINT ( pp ( 2 ) ), NINT ( pp ( 3 ) )
00704                 END IF
00705              END IF
00706           END DO
00707 !          WRITE ( iounit, '(/)' )
00708        END IF
00709     ELSE
00710        IF (iounit>0) THEN
00711          WRITE ( iounit, '(/,A,T71,I10)' ) &
00712               " RS_GRID: Information for grid number ", rs % desc % grid_id
00713          DO i = 1, 3
00714             WRITE ( iounit, '(A,I3,T30,2I8,T62,A,T71,I10)' ) " RS_GRID:   Bounds ", &
00715                  i, rs % desc % lb ( i ), rs % desc % ub ( i ), "Points:", rs % desc % npts ( i )
00716          END DO
00717 !         WRITE ( iounit, '(/)' )
00718        ENDIF
00719     END IF
00720 
00721   END SUBROUTINE rs_grid_print
00722 
00723 ! *****************************************************************************
00732   SUBROUTINE rs_pw_transfer ( rs, pw, dir, error)
00733 
00734     TYPE(realspace_grid_type), POINTER       :: rs
00735     TYPE(pw_type), POINTER                   :: pw
00736     INTEGER, INTENT(IN)                      :: dir
00737     TYPE(cp_error_type), INTENT(INOUT)       :: error
00738 
00739     CHARACTER(len=*), PARAMETER :: routineN = 'rs_pw_transfer', 
00740       routineP = moduleN//':'//routineN
00741 
00742     CHARACTER(LEN=5)                         :: dirname
00743     INTEGER                                  :: handle, handle2, nn
00744     LOGICAL                                  :: failure
00745 
00746     failure = .FALSE.
00747     CALL timeset(routineN,handle2)
00748     IF (dir.EQ.rs2pw) dirname="RS2PW"
00749     IF (dir.EQ.pw2rs) dirname="PW2RS"
00750     CALL timeset(routineN//"_"//TRIM(dirname)//"_"// TRIM(ADJUSTL(&
00751               cp_to_string(CEILING(pw%pw_grid%cutoff/10)*10))),handle)
00752 
00753     IF ( rs % desc % grid_id /= pw % pw_grid % id_nr ) &
00754        CALL stop_program(routineN,moduleN,__LINE__,"Different rs and pw indentitfiers")
00755     IF ( (pw%in_use .NE. REALDATA3D) .AND. (pw%in_use .NE. COMPLEXDATA3D)) &
00756        CALL stop_program(routineN,moduleN,__LINE__,"Need REALDATA3D or COMPLEXDATA3D")
00757     IF ( (dir.NE.rs2pw) .AND. (dir.NE.pw2rs) ) &
00758        CALL stop_program(routineN,moduleN,__LINE__,"Direction must be rs2pw or pw2rs")
00759 
00760     IF (  rs % desc % parallel ) THEN
00761        IF ( .NOT. rs % desc % distributed ) THEN
00762           CALL rs_pw_transfer_replicated(rs,pw,dir,error)
00763        ELSE
00764           CALL rs_pw_transfer_distributed(rs,pw,dir,error)
00765        END IF
00766     ELSE ! treat simple serial case locally
00767        nn = SIZE ( rs % r )
00768        IF ( dir == rs2pw ) THEN
00769           IF ( pw % in_use == REALDATA3D ) THEN
00770              CALL dcopy ( nn, rs % r, 1, pw % cr3d, 1 )
00771           ELSEIF ( pw % in_use == COMPLEXDATA3D ) THEN
00772              CALL copy_rc ( nn, rs % r, pw % cc3d )
00773           ELSE
00774              CALL stop_program(routineN,moduleN,__LINE__,"PW type not compatible")
00775           END IF
00776        ELSE
00777           IF ( pw % in_use == REALDATA3D ) THEN
00778              CALL dcopy ( nn, pw % cr3d, 1, rs % r, 1 )
00779           ELSEIF ( pw % in_use == COMPLEXDATA3D ) THEN
00780              CALL copy_cr ( nn, pw % cc3d, rs % r )
00781           ELSE
00782              CALL stop_program(routineN,moduleN,__LINE__,"PW type not compatible")
00783           END IF
00784        END IF
00785     END IF
00786     CALL timestop(handle)
00787     CALL timestop(handle2)
00788 
00789   END SUBROUTINE rs_pw_transfer
00790 
00791 ! *****************************************************************************
00798   SUBROUTINE rs_pw_transfer_replicated(rs,pw,dir,error)
00799     TYPE(realspace_grid_type), POINTER       :: rs
00800     TYPE(pw_type), POINTER                   :: pw
00801     INTEGER, INTENT(IN)                      :: dir
00802     TYPE(cp_error_type), INTENT(INOUT)       :: error
00803 
00804     CHARACTER(len=*), PARAMETER :: routineN = 'rs_pw_transfer_replicated', 
00805       routineP = moduleN//':'//routineN
00806 
00807     INTEGER                                  :: dest, group, ii, ip, ix, iy, 
00808                                                 iz, mepos, nma, nn, np, 
00809                                                 req(2), s(3), source, stat
00810     INTEGER, ALLOCATABLE, DIMENSION(:)       :: rcount
00811     INTEGER, DIMENSION(3)                    :: lb, ub
00812     INTEGER, DIMENSION(:, :), POINTER        :: pbo
00813     INTEGER, DIMENSION(:, :, :), POINTER     :: bo
00814     LOGICAL                                  :: failure
00815     REAL(KIND=dp), DIMENSION(:), POINTER     :: recvbuf, sendbuf, swapptr
00816     REAL(KIND=dp), DIMENSION(:, :, :), 
00817       POINTER                                :: grid
00818 
00819     failure = .FALSE.
00820     np = pw % pw_grid % para % group_size
00821     bo => pw % pw_grid % para % bo (1:2,1:3,0:np-1,1)
00822     pbo => pw % pw_grid % bounds
00823     group = pw % pw_grid % para % rs_group
00824     mepos = pw % pw_grid % para % rs_mpo
00825     ALLOCATE ( rcount ( 0 : np - 1 ), STAT = stat )
00826     CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00827     DO ip = 1, np
00828        rcount ( ip-1 ) = PRODUCT ( bo(2,:,ip) - bo(1,:,ip) + 1 )
00829     END DO
00830     nma = MAXVAL ( rcount ( 0 : np - 1 ) )
00831     ALLOCATE(sendbuf(nma),recvbuf(nma), STAT=stat)
00832     CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00833     grid=>rs%r
00834 
00835     IF ( dir == rs2pw ) THEN
00836        dest  =MODULO(mepos+1,np)
00837        source=MODULO(mepos-1,np)
00838        sendbuf=0.0_dp
00839 
00840        DO ip = 1, np
00841 
00842           lb = pbo(1,:)+bo(1,:,MODULO(mepos-ip,np)+1)-1
00843           ub = pbo(1,:)+bo(2,:,MODULO(mepos-ip,np)+1)-1
00844           ! this loop takes about the same time as the message passing call
00845           ! notice that the range of ix is only a small fraction of the first index of grid
00846           ! therefore it seems faster to have the second index as the innermost loop
00847           ! if this runs on many cpus
00848           ! tested on itanium, pentium4, opteron, ultrasparc...
00849           s=ub-lb+1
00850           DO iz = lb(3), ub(3)
00851              DO ix = lb(1), ub(1)
00852                 ii= (iz-lb(3))*s(1)*s(2)+(ix-lb(1))+1
00853                 DO iy = lb(2), ub(2)
00854                    sendbuf(ii) = sendbuf(ii) + grid(ix,iy,iz)
00855                    ii=ii+s(1)
00856                 END DO
00857              END DO
00858           END DO
00859           IF ( ip .EQ. np ) EXIT
00860           CALL mp_isendrecv(sendbuf,dest,recvbuf,source, &
00861                group,req(1),req(2),13)
00862           CALL mp_waitall(req)
00863           swapptr=>sendbuf
00864           sendbuf=>recvbuf
00865           recvbuf=>swapptr
00866        ENDDO
00867        nn = rcount(mepos)
00868        IF ( pw % in_use == REALDATA3D ) THEN
00869           CALL dcopy ( nn, sendbuf, 1, pw % cr3d, 1 )
00870        ELSEIF ( pw % in_use == COMPLEXDATA3D ) THEN
00871           CALL copy_rc ( nn, sendbuf, pw % cc3d )
00872        ELSE
00873           CALL stop_program(routineN,moduleN,__LINE__,"PW type not compatible")
00874        END IF
00875     ELSE
00876        nn = rcount ( mepos )
00877        IF ( pw % in_use == REALDATA3D ) THEN
00878           CALL dcopy ( nn, pw % cr3d, 1, sendbuf, 1 )
00879        ELSEIF ( pw % in_use == COMPLEXDATA3D ) THEN
00880           CALL dcopy ( nn, pw % cc3d, 2, sendbuf, 1 )
00881        ELSE
00882           CALL stop_program(routineN,moduleN,__LINE__,"PW type not compatible")
00883        END IF
00884 
00885        dest  =MODULO(mepos+1,np)
00886        source=MODULO(mepos-1,np)
00887 
00888        DO ip = 0, np-1
00889           ! we must shift the buffer only np-1 times around
00890           IF (ip .NE. np-1) THEN
00891              CALL mp_isendrecv(sendbuf,dest,recvbuf,source, &
00892                   group,req(1),req(2),13)
00893           ENDIF
00894           lb = pbo(1,:)+bo(1,:,MODULO(mepos-ip,np)+1)-1
00895           ub = pbo(1,:)+bo(2,:,MODULO(mepos-ip,np)+1)-1
00896           ii = 0
00897           ! this loop takes about the same time as the message passing call
00898           ! If I read the code correctly then:
00899           ! BUG/FIXME: Unfortunately MPI standard says we are not allowed
00900           ! to access the sendbuf/recvbuf while the sendrecv is still not
00901           ! finished. This EXPLICITLY includes READING sendbuf!
00902           ! I don't know any MPI implementation that really requires this
00903           ! but the standard explicitly mandates it.
00904           DO iz = lb(3), ub(3)
00905              DO iy = lb(2), ub(2)
00906                 DO ix = lb(1), ub(1)
00907                    ii=ii+1
00908                    grid(ix,iy,iz) = sendbuf(ii)
00909                 END DO
00910              END DO
00911           END DO
00912           IF (ip .NE. np-1) THEN
00913              CALL mp_waitall(req)
00914           ENDIF
00915           swapptr=>sendbuf
00916           sendbuf=>recvbuf
00917           recvbuf=>swapptr
00918        ENDDO
00919 
00920     END IF
00921 
00922     DEALLOCATE ( rcount, STAT=stat )
00923     CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00924     DEALLOCATE ( sendbuf, STAT=stat )
00925     CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00926     DEALLOCATE ( recvbuf, STAT=stat )
00927     CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
00928   END SUBROUTINE rs_pw_transfer_replicated
00929 
00930 ! *****************************************************************************
00950   SUBROUTINE rs_pw_transfer_distributed(rs,pw,dir,error)
00951     TYPE(realspace_grid_type), POINTER       :: rs
00952     TYPE(pw_type), POINTER                   :: pw
00953     INTEGER, INTENT(IN)                      :: dir
00954     TYPE(cp_error_type), INTENT(INOUT)       :: error
00955 
00956     CHARACTER(len=*), PARAMETER :: routineN = 'rs_pw_transfer_distributed', 
00957       routineP = moduleN//':'//routineN
00958 
00959     CHARACTER(LEN=200)                       :: error_string
00960     INTEGER :: completed, dest_down, dest_up, i, idir, j, k, lb, my_id, 
00961       my_pw_rank, my_rs_rank, n_shifts, nn, num_threads, position, 
00962       source_down, source_up, stat, ub, x, y, z
00963     INTEGER, ALLOCATABLE, DIMENSION(:)       :: dshifts, recv_disps, 
00964                                                 recv_reqs, recv_sizes, 
00965                                                 send_disps, send_reqs, 
00966                                                 send_sizes, ushifts
00967     INTEGER, ALLOCATABLE, DIMENSION(:, :)    :: bounds, recv_tasks, send_tasks
00968     INTEGER, DIMENSION(2)                    :: neighbours, pos
00969     INTEGER, DIMENSION(3) :: coords, lb_recv, lb_recv_down, lb_recv_up, 
00970       lb_send, lb_send_down, lb_send_up, ub_recv, ub_recv_down, ub_recv_up, 
00971       ub_send, ub_send_down, ub_send_up
00972     INTEGER, DIMENSION(4)                    :: req
00973     LOGICAL                                  :: failure
00974     LOGICAL, DIMENSION(3)                    :: halo_swapped
00975     REAL(KIND=dp)                            :: pw_sum, rs_sum
00976     REAL(KIND=dp), DIMENSION(:, :, :), 
00977       POINTER                                :: recv_buf_3d_down, 
00978                                                 recv_buf_3d_up, 
00979                                                 send_buf_3d_down, 
00980                                                 send_buf_3d_up
00981     TYPE(cp_1d_r_p_type), ALLOCATABLE, 
00982       DIMENSION(:)                           :: recv_bufs, send_bufs
00983 
00984 !$  INTEGER :: omp_get_max_threads, omp_get_thread_num
00985 
00986     num_threads = 1
00987     my_id = 0
00988 
00989     IF ( dir == rs2pw ) THEN
00990 
00991        ! safety check, to be removed once we're absolute sure the routine is correct
00992        IF (debug_this_module) THEN
00993           rs_sum=accurate_sum(rs%r)*ABS(det_3x3(rs%desc % dh))
00994           CALL mp_sum(rs_sum,rs%desc%group)
00995        ENDIF
00996 
00997        halo_swapped = .FALSE.
00998        ! We don't need to send the 'edges' of the halos that have already been sent
00999        ! Halos are contiguous in memory in z-direction only, so swap these first,
01000        ! and send less data in the y and x directions which are more expensive
01001 
01002        DO idir = 3,1,-1
01003 
01004           IF ( rs % desc % perd (idir) .NE. 1) THEN
01005 
01006              ALLOCATE ( dshifts ( 0:rs % desc % neighbours (idir ) ), STAT=stat )
01007              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01008              ALLOCATE ( ushifts ( 0:rs % desc % neighbours (idir ) ), STAT=stat )
01009              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01010 
01011              ushifts = 0
01012              dshifts = 0
01013 
01014              ! check that we don't try to send data to ourself
01015              DO n_shifts = 1, MIN(rs % desc % neighbours(idir),rs%desc % group_dim(idir)-1)
01016 
01017                 ! need to take into account the possible varying widths of neighbouring cells
01018                 ! offset_up and offset_down hold the real size of the neighbouring cells
01019                 position = MODULO ( rs % desc % virtual_group_coor (idir) - n_shifts, rs % desc % group_dim (idir))
01020                 neighbours = get_limit ( rs % desc % npts ( idir ), rs % desc % group_dim ( idir ), position )
01021                 dshifts(n_shifts) = dshifts(n_shifts-1) + ( neighbours (2) - neighbours (1) + 1 )
01022 
01023                 position = MODULO ( rs % desc % virtual_group_coor (idir) + n_shifts, rs % desc % group_dim (idir))
01024                 neighbours = get_limit ( rs % desc % npts ( idir ), rs % desc % group_dim ( idir ), position )
01025                 ushifts(n_shifts) = ushifts(n_shifts-1) + ( neighbours (2) - neighbours (1) + 1 )
01026 
01027                 ! The border data has to be send/received from the neighbours
01028                 ! First we calculate the source and destination processes for the shift
01029                 ! We do both shifts at once to allow for more overlap of communication and buffer packing/unpacking
01030 
01031                 CALL cart_shift ( rs, idir, -1 * n_shifts, source_down, dest_down)
01032 
01033                 lb_send_down ( : ) = rs % lb_local ( : )
01034                 lb_recv_down ( : ) = rs % lb_local ( : )
01035                 ub_recv_down ( : ) = rs % ub_local ( : )
01036                 ub_send_down ( : ) = rs % ub_local ( : )
01037 
01038                 IF(dshifts (n_shifts - 1) .LE. rs % desc % border ) THEN
01039                    ub_send_down ( idir ) = lb_send_down ( idir ) + rs % desc % border  - 1 - dshifts(n_shifts-1)
01040                    lb_send_down ( idir ) = MAX( lb_send_down ( idir ),&
01041                         lb_send_down ( idir ) + rs % desc % border  - dshifts ( n_shifts ) )
01042 
01043                    ub_recv_down ( idir ) = ub_recv_down ( idir ) - rs % desc % border
01044                    lb_recv_down ( idir ) = MAX(lb_recv_down(idir) + rs % desc % border,&
01045                         ub_recv_down ( idir ) - rs % desc % border + 1 + ushifts(n_shifts-1) )
01046                 ELSE
01047                    lb_send_down( idir ) = 0
01048                    ub_send_down( idir ) = -1
01049                    lb_recv_down( idir ) = 0
01050                    ub_recv_down( idir ) = -1
01051                 ENDIF
01052 
01053                 DO i=1,3
01054                    IF ( halo_swapped(i) ) THEN
01055                       lb_send_down(i) = rs % lb_real(i)
01056                       ub_send_down(i) = rs % ub_real(i)
01057                       lb_recv_down(i) = rs % lb_real(i)
01058                       ub_recv_down(i) = rs % ub_real(i)
01059                    ENDIF
01060                 ENDDO
01061 
01062                 ! post the recieve
01063                 ALLOCATE ( recv_buf_3d_down (lb_recv_down(1):ub_recv_down(1), &
01064                      lb_recv_down(2):ub_recv_down(2), lb_recv_down(3):ub_recv_down(3)),stat=stat)
01065                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01066                 CALL mp_irecv (recv_buf_3d_down, source_down, rs % desc % group, req(1))
01067 
01068                 ! now allocate, pack and send the send buffer
01069                 nn = PRODUCT ( ub_send_down - lb_send_down + 1 )
01070                 ALLOCATE ( send_buf_3d_down ( lb_send_down(1):ub_send_down(1), &
01071                      lb_send_down(2):ub_send_down(2), lb_send_down(3):ub_send_down(3) ), STAT=stat )
01072                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01073 
01074 !$omp parallel default(none), &
01075 !$omp          private(lb,ub,my_id,num_threads), &
01076 !$omp          shared(send_buf_3d_down,rs,lb_send_down,ub_send_down)
01077 !$              num_threads = MIN(omp_get_max_threads(), ub_send_down(3)-lb_send_down(3)+1)
01078 !$              my_id = omp_get_thread_num()
01079                 IF (my_id < num_threads) THEN
01080                   lb = lb_send_down(3) + ((ub_send_down(3)-lb_send_down(3)+1)*my_id)/num_threads
01081                   ub = lb_send_down(3) + ((ub_send_down(3)-lb_send_down(3)+1)*(my_id+1))/num_threads - 1
01082 
01083                   send_buf_3d_down(lb_send_down(1):ub_send_down(1), lb_send_down(2):ub_send_down(2), &
01084                                    lb:ub) = rs % r ( lb_send_down(1):ub_send_down(1), &
01085                                    lb_send_down(2):ub_send_down(2), lb:ub )
01086                 END IF
01087 !$omp end parallel
01088 
01089                 CALL mp_isend (send_buf_3d_down, dest_down, rs % desc % group, req(3))
01090 
01091                 ! Now for the other direction
01092                 CALL cart_shift (rs, idir, n_shifts, source_up, dest_up )
01093 
01094                 lb_send_up ( : ) = rs % lb_local ( : )
01095                 lb_recv_up ( : ) = rs % lb_local ( : )
01096                 ub_recv_up ( : ) = rs % ub_local ( : )
01097                 ub_send_up ( : ) = rs % ub_local ( : )
01098 
01099                 IF (ushifts(n_shifts - 1) .LE. rs % desc % border ) THEN
01100 
01101                    lb_send_up ( idir ) = ub_send_up ( idir ) - rs % desc % border + 1 + ushifts(n_shifts-1)
01102                    ub_send_up ( idir ) = MIN( ub_send_up( idir ),&
01103                         ub_send_up ( idir ) - rs % desc % border  + ushifts( n_shifts ) )
01104 
01105                    lb_recv_up ( idir ) = lb_recv_up ( idir ) + rs % desc % border
01106                    ub_recv_up ( idir ) = MIN(ub_recv_up (idir)- rs%desc % border,&
01107                         lb_recv_up ( idir ) + rs % desc % border - 1 - dshifts(n_shifts-1))
01108                 ELSE
01109                    lb_send_up( idir ) = 0
01110                    ub_send_up( idir ) = -1
01111                    lb_recv_up( idir ) = 0
01112                    ub_recv_up( idir ) = -1
01113                 ENDIF
01114 
01115                 DO i=1,3
01116                    IF ( halo_swapped(i) ) THEN
01117                       lb_send_up(i) = rs % lb_real(i)
01118                       ub_send_up(i) = rs % ub_real(i)
01119                       lb_recv_up(i) = rs % lb_real(i)
01120                       ub_recv_up(i) = rs % ub_real(i)
01121                    ENDIF
01122                 ENDDO
01123 
01124                 ! post the recieve
01125                 ALLOCATE ( recv_buf_3d_up (lb_recv_up(1):ub_recv_up(1), &
01126                      lb_recv_up(2):ub_recv_up(2), lb_recv_up(3):ub_recv_up(3)),STAT=stat)
01127                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01128                 CALL mp_irecv(recv_buf_3d_up, source_up, rs % desc % group, req(2))
01129 
01130                 ! now allocate,pack and send the send buffer
01131                 nn = PRODUCT ( ub_send_up - lb_send_up + 1 )
01132                 ALLOCATE ( send_buf_3d_up ( lb_send_up(1):ub_send_up(1), &
01133                      lb_send_up(2):ub_send_up(2), lb_send_up(3):ub_send_up(3) ), STAT=stat )
01134                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01135 
01136 !$omp parallel default(none), &
01137 !$omp          private(lb,ub,my_id,num_threads), &
01138 !$omp          shared(send_buf_3d_up,rs,lb_send_up,ub_send_up)
01139 !$              num_threads = MIN(omp_get_max_threads(), ub_send_up(3)-lb_send_up(3)+1)
01140 !$              my_id = omp_get_thread_num()
01141                 IF (my_id < num_threads) THEN
01142                   lb = lb_send_up(3) + ((ub_send_up(3)-lb_send_up(3)+1)*my_id)/num_threads
01143                   ub = lb_send_up(3) + ((ub_send_up(3)-lb_send_up(3)+1)*(my_id+1))/num_threads - 1
01144 
01145                   send_buf_3d_up ( lb_send_up(1):ub_send_up(1), lb_send_up(2):ub_send_up(2), &
01146                                    lb:ub ) = rs % r ( lb_send_up(1):ub_send_up(1), &
01147                                    lb_send_up(2):ub_send_up(2), lb:ub )
01148                 END IF
01149 !$omp end parallel
01150 
01151                 CALL mp_isend (send_buf_3d_up, dest_up, rs % desc % group, req(4))
01152 
01153                 ! wait for a recv to complete, then we can unpack
01154 
01155                 DO i=1,2
01156 
01157                    CALL mp_waitany (req(1:2),completed)
01158 
01159                    IF ( completed .EQ. 1 ) THEN
01160 
01161                       ! only some procs may need later shifts
01162                       IF(ub_recv_down(idir).GE.lb_recv_down(idir))THEN
01163                          ! Sum the data in the RS Grid
01164 !$omp parallel default(none), &
01165 !$omp          private(lb,ub,my_id,num_threads), &
01166 !$omp          shared(recv_buf_3d_down,rs,lb_recv_down,ub_recv_down)
01167 !$                       num_threads = MIN(omp_get_max_threads(), ub_recv_down(3)-lb_recv_down(3)+1)
01168 !$                       my_id = omp_get_thread_num()
01169                          IF (my_id < num_threads) THEN
01170                            lb = lb_recv_down(3) + ((ub_recv_down(3)-lb_recv_down(3)+1)*my_id)/num_threads
01171                            ub = lb_recv_down(3) + ((ub_recv_down(3)-lb_recv_down(3)+1)*(my_id+1))/num_threads - 1
01172 
01173                            rs % r ( lb_recv_down(1):ub_recv_down(1), &
01174                                 lb_recv_down(2):ub_recv_down(2), lb:ub ) = &
01175                                 rs % r ( lb_recv_down(1):ub_recv_down(1), &
01176                                 lb_recv_down(2):ub_recv_down(2), lb:ub ) + &
01177                                 recv_buf_3d_down (:, :, lb:ub)
01178                          END IF
01179 !$omp end parallel
01180                       END IF
01181                       DEALLOCATE( recv_buf_3d_down, STAT=stat)
01182                       CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01183                    ELSE
01184 
01185                       ! only some procs may need later shifts
01186                       IF(ub_recv_up(idir).GE.lb_recv_up(idir))THEN
01187                          ! Sum the data in the RS Grid
01188 !$omp parallel default(none), &
01189 !$omp          private(lb,ub,my_id,num_threads), &
01190 !$omp          shared(recv_buf_3d_up,rs,lb_recv_up,ub_recv_up)
01191 !$              num_threads = MIN(omp_get_max_threads(), ub_recv_up(3)-lb_recv_up(3)+1)
01192 !$              my_id = omp_get_thread_num()
01193                          IF (my_id < num_threads) THEN
01194                            lb = lb_recv_up(3) + ((ub_recv_up(3)-lb_recv_up(3)+1)*my_id)/num_threads
01195                            ub = lb_recv_up(3) + ((ub_recv_up(3)-lb_recv_up(3)+1)*(my_id+1))/num_threads - 1
01196 
01197                            rs % r ( lb_recv_up(1):ub_recv_up(1), &
01198                                 lb_recv_up(2):ub_recv_up(2), lb:ub ) = &
01199                                 rs % r ( lb_recv_up(1):ub_recv_up(1), &
01200                                 lb_recv_up(2):ub_recv_up(2), lb:ub ) + &
01201                                 recv_buf_3d_up (:, :, lb:ub)
01202                          END IF
01203 !$omp end parallel
01204                       END IF
01205                       DEALLOCATE( recv_buf_3d_up, STAT=stat)
01206                       CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01207                    END IF
01208 
01209                 END DO
01210 
01211                 ! make sure the sends have completed before we deallocate
01212 
01213                 CALL mp_waitall (req(3:4))
01214 
01215                 DEALLOCATE( send_buf_3d_down, STAT=stat)
01216                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01217                 DEALLOCATE( send_buf_3d_up, STAT=stat)
01218                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01219              END DO
01220 
01221              DEALLOCATE ( dshifts, STAT=stat )
01222              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01223              DEALLOCATE ( ushifts, STAT=stat )
01224              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01225 
01226           END IF
01227 
01228           halo_swapped(idir) = .TRUE.
01229 
01230        END DO
01231 
01232        ! This is the real redistribution
01233        ALLOCATE ( bounds ( 0:pw % pw_grid % para % group_size - 1, 1:4 ), STAT=stat )
01234        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01235 
01236        ! work out the pw grid points each proc holds
01237        DO i = 0, pw % pw_grid % para % group_size - 1
01238           bounds ( i , 1:2 ) = pw % pw_grid % para % bo (1:2,1,i,1)
01239           bounds ( i , 3:4 ) = pw % pw_grid % para % bo (1:2,2,i,1)
01240           bounds ( i , 1:2 ) = bounds ( i , 1:2 ) - pw % pw_grid % npts (1) / 2 - 1
01241           bounds ( i , 3:4 ) = bounds ( i , 3:4 ) - pw % pw_grid % npts (2) / 2 - 1
01242        ENDDO
01243 
01244        ALLOCATE ( send_tasks ( 0:pw % pw_grid % para % group_size -1,1:6 ), STAT=stat )
01245        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01246        ALLOCATE ( send_sizes ( 0:pw % pw_grid % para % group_size -1 ), STAT=stat )
01247        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01248        ALLOCATE ( send_disps ( 0:pw % pw_grid % para % group_size -1 ), STAT=stat )
01249        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01250        ALLOCATE ( recv_tasks ( 0:pw % pw_grid % para % group_size -1,1:6 ), STAT=stat )
01251        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01252        ALLOCATE ( recv_sizes ( 0:pw % pw_grid % para % group_size -1 ), STAT=stat )
01253        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01254        ALLOCATE ( recv_disps ( 0:pw % pw_grid % para % group_size -1 ), STAT=stat )
01255        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01256        send_tasks = 0
01257        send_tasks(:,1)=1
01258        send_tasks(:,2)=0
01259        send_tasks(:,3)=1
01260        send_tasks(:,4)=0
01261        send_tasks(:,5)=1
01262        send_tasks(:,6)=0
01263        send_sizes = 0
01264        recv_tasks = 0
01265        recv_tasks(:,1)=1
01266        recv_tasks(:,2)=0
01267        send_tasks(:,3)=1
01268        send_tasks(:,4)=0
01269        send_tasks(:,5)=1
01270        send_tasks(:,6)=0
01271        recv_sizes = 0
01272 
01273        send_disps = 0
01274        recv_disps = 0
01275 
01276        my_rs_rank = rs % desc % my_pos
01277        my_pw_rank = pw % pw_grid % para % rs_mpo
01278 
01279        ! find the processors that should hold our data
01280        ! should be part of the rs grid type
01281        ! this is a loop over real ranks (i.e. the in-order cartesian ranks)
01282        ! do the recv and send tasks in two seperate loops which will
01283        ! load balance better for OpenMP with large numbers of MPI tasks
01284 
01285 !$omp parallel do default(none), &
01286 !$omp             private(coords,idir,pos,lb_send,ub_send), &
01287 !$omp             shared(rs,bounds,my_rs_rank,recv_tasks,recv_sizes)
01288        DO i = 0, rs % desc % group_size -1
01289 
01290           coords(:) = rs % desc % rank2coord ( : , rs % desc % real2virtual(i))
01291           !calculate the rs grid points on each processor
01292           !coords is the part of the grid that rank i actually holds
01293           DO idir = 1,3
01294              pos (:) = get_limit ( rs % desc %  npts ( idir ), rs % desc % group_dim ( idir ), coords(idir) )
01295              pos (:) = pos (:) - rs % desc %  npts (idir) / 2 - 1
01296              lb_send(idir) = pos(1)
01297              ub_send(idir) = pos(2)
01298           ENDDO
01299 
01300           IF (lb_send(1) .GT.bounds(my_rs_rank,2)) CYCLE
01301           IF (ub_send(1) .LT.bounds(my_rs_rank,1)) CYCLE
01302           IF (lb_send(2) .GT.bounds(my_rs_rank,4)) CYCLE
01303           IF (ub_send(2) .LT.bounds(my_rs_rank,3)) CYCLE
01304 
01305           recv_tasks(i,1)= MAX(lb_send(1),bounds(my_rs_rank,1))
01306           recv_tasks(i,2)= MIN(ub_send(1),bounds(my_rs_rank,2))
01307           recv_tasks(i,3)= MAX(lb_send(2),bounds(my_rs_rank,3))
01308           recv_tasks(i,4)= MIN(ub_send(2),bounds(my_rs_rank,4))
01309           recv_tasks(i,5)= lb_send(3)
01310           recv_tasks(i,6)= ub_send(3)
01311           recv_sizes(i)  = (recv_tasks(i,2)-recv_tasks( i ,1)+1)* &
01312              (recv_tasks( i ,4)-recv_tasks( i ,3)+1)*(recv_tasks( i ,6)-recv_tasks( i ,5)+1)
01313 
01314        ENDDO
01315 !$omp end parallel do
01316 
01317        coords(:) = rs % desc % rank2coord ( : , rs % desc % real2virtual(my_rs_rank))
01318        DO idir = 1,3
01319           pos (:) = get_limit ( rs % desc %  npts ( idir ), rs % desc % group_dim ( idir ), coords(idir) )
01320           pos (:) = pos (:) - rs % desc %  npts (idir) / 2 - 1
01321           lb_send(idir) = pos(1)
01322           ub_send(idir) = pos(2)
01323        ENDDO
01324 
01325        lb_recv(:) = lb_send(:)
01326        ub_recv(:) = ub_send(:)
01327 !$omp parallel do default(none), &
01328 !$omp             shared(pw,lb_send,ub_send,bounds,send_tasks,send_sizes)
01329        DO j = 0, pw % pw_grid % para % group_size - 1
01330 
01331           IF (lb_send(1) .GT.bounds(j,2)) CYCLE
01332           IF (ub_send(1) .LT.bounds(j,1)) CYCLE
01333           IF (lb_send(2) .GT.bounds(j,4)) CYCLE
01334           IF (ub_send(2) .LT.bounds(j,3)) CYCLE
01335 
01336           send_tasks(j,1)= MAX(lb_send(1),bounds(j,1))
01337           send_tasks(j,2)= MIN(ub_send(1),bounds(j,2))
01338           send_tasks(j,3)= MAX(lb_send(2),bounds(j,3))
01339           send_tasks(j,4)= MIN(ub_send(2),bounds(j,4))
01340           send_tasks(j,5)= lb_send(3)
01341           send_tasks(j,6)= ub_send(3)
01342           send_sizes(j)  = (send_tasks( j ,2)-send_tasks( j ,1)+1)* &
01343                (send_tasks( j ,4)-send_tasks( j ,3)+1)*(send_tasks( j ,6)-send_tasks( j ,5)+1)
01344 
01345        END DO
01346 !$omp end parallel do
01347 
01348        send_disps(0)=0
01349        recv_disps(0)=0
01350        DO i = 1, pw % pw_grid % para % group_size - 1
01351           send_disps(i) = send_disps(i-1) + send_sizes(i-1)
01352           recv_disps(i) = recv_disps(i-1) + recv_sizes(i-1)
01353        ENDDO
01354 
01355        CPPrecondition(SUM(send_sizes)==PRODUCT(ub_recv - lb_recv + 1),cp_failure_level,routineP,error,failure)
01356 
01357        ALLOCATE ( send_bufs ( 0:rs % desc % group_size - 1 ), STAT=stat )
01358        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01359        ALLOCATE ( recv_bufs ( 0:rs % desc % group_size - 1 ), STAT=stat )
01360        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01361 
01362        DO i = 0, rs % desc % group_size - 1
01363          IF ( send_sizes(i) .NE. 0 ) THEN
01364            ALLOCATE(send_bufs(i)%array (send_sizes(i)))
01365          ELSE
01366            NULLIFY(send_bufs(i)%array)
01367          END IF
01368          IF ( recv_sizes(i) .NE. 0 ) THEN
01369            ALLOCATE(recv_bufs(i)%array (recv_sizes(i)))
01370          ELSE
01371            NULLIFY(recv_bufs(i)%array)
01372          END IF
01373        END DO
01374 
01375        ALLOCATE (recv_reqs (0:rs % desc % group_size - 1), STAT=stat)
01376        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01377        recv_reqs = mp_request_null
01378 
01379        DO i = 0, rs % desc % group_size - 1
01380          IF ( recv_sizes(i) .NE. 0 ) THEN
01381            CALL mp_irecv(recv_bufs(i)%array, i, rs % desc % group, recv_reqs(i))
01382          END IF
01383        END DO
01384 
01385        ! do packing
01386 !$omp parallel do default(none), &
01387 !$omp             private(k,z,y,x), &
01388 !$omp             shared(rs,send_tasks,send_bufs,send_disps)
01389        DO i = 0, rs % desc % group_size - 1
01390           k=0
01391           DO z = send_tasks(i,5) , send_tasks(i,6)
01392              DO y = send_tasks(i,3) , send_tasks(i,4)
01393                 DO x = send_tasks(i,1) , send_tasks(i,2)
01394                    k=k+1
01395                    send_bufs(i)%array(k)= rs % r (x,y,z)
01396                 ENDDO
01397              ENDDO
01398           ENDDO
01399        ENDDO
01400 !$omp end parallel do
01401 
01402        ALLOCATE (send_reqs (0:rs % desc % group_size - 1), STAT=stat)
01403        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01404        send_reqs = mp_request_null
01405 
01406        DO i = 0, rs % desc % group_size - 1
01407          IF ( send_sizes(i) .NE. 0 ) THEN
01408            CALL mp_isend(send_bufs(i)%array, i, rs % desc % group, send_reqs(i))
01409          END IF
01410        END DO
01411 
01412        ! do unpacking
01413        ! no OMP here so we can unpack each message as it arrives
01414        DO i = 0, rs % desc % group_size - 1
01415           IF ( recv_sizes(i) .EQ. 0) CYCLE
01416 
01417           CALL mp_waitany(recv_reqs, completed)
01418           k=0
01419           DO z = recv_tasks(completed-1,5) , recv_tasks(completed-1,6)
01420              DO y = recv_tasks(completed-1,3) , recv_tasks(completed-1,4)
01421                 DO x = recv_tasks(completed-1,1) , recv_tasks(completed-1,2)
01422                    k=k+1
01423                    pw % cr3d (x,y,z) = recv_bufs(completed-1)%array(k)
01424                 ENDDO
01425              ENDDO
01426           ENDDO
01427        ENDDO
01428 
01429        CALL mp_waitall(send_reqs)
01430 
01431        DEALLOCATE ( recv_reqs, STAT=stat)
01432        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01433        DEALLOCATE ( send_reqs, STAT=stat)
01434        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01435 
01436 
01437        DO i = 0, rs % desc % group_size - 1
01438          IF ( ASSOCIATED(send_bufs(i)%array ) ) THEN
01439            DEALLOCATE(send_bufs(i)%array)
01440          END IF
01441          IF ( ASSOCIATED(recv_bufs(i)%array ) ) THEN
01442            DEALLOCATE(recv_bufs(i)%array)
01443          END IF
01444        END DO
01445 
01446        DEALLOCATE ( send_bufs, STAT=stat)
01447        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01448        DEALLOCATE ( recv_bufs, STAT=stat)
01449        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01450        DEALLOCATE ( send_tasks, STAT=stat)
01451        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01452        DEALLOCATE ( send_sizes, STAT=stat)
01453        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01454        DEALLOCATE ( send_disps, STAT=stat)
01455        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01456        DEALLOCATE ( recv_tasks, STAT=stat)
01457        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01458        DEALLOCATE ( recv_sizes, STAT=stat)
01459        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01460        DEALLOCATE ( recv_disps, STAT=stat)
01461        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01462 
01463        IF (debug_this_module) THEN
01464           ! safety check, to be removed once we're absolute sure the routine is correct
01465           pw_sum=pw_integrate_function(pw,error=error)
01466           IF (ABS(pw_sum-rs_sum)/MAX(1.0_dp,ABS(pw_sum),ABS(rs_sum))>EPSILON(rs_sum)*1000) THEN
01467              WRITE(error_string,'(A,6(1X,I4.4),3F25.16)') "rs_pw_transfer_distributed", &
01468                   rs % desc %  npts, rs % desc % group_dim, pw_sum,rs_sum,ABS(pw_sum-rs_sum)
01469              CALL stop_program(routineN,moduleN,__LINE__,&
01470                                error_string//" Please report this bug ... quick workaround: use "//&
01471                                "DISTRIBUTION_TYPE REPLICATED")
01472           ENDIF
01473        ENDIF
01474 
01475     ELSE
01476 
01477        ! pw to rs transfer
01478 
01479        CALL rs_grid_zero( rs )
01480 
01481        ! This is the real redistribution
01482 
01483        ALLOCATE ( bounds ( 0:pw % pw_grid % para % group_size - 1, 1:4 ), STAT=stat )
01484        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01485 
01486        DO i = 0, pw % pw_grid % para % group_size - 1
01487           bounds ( i , 1:2 ) = pw % pw_grid % para % bo (1:2,1,i,1)
01488           bounds ( i , 3:4 ) = pw % pw_grid % para % bo (1:2,2,i,1)
01489           bounds ( i , 1:2 ) = bounds ( i , 1:2 ) - pw % pw_grid % npts (1) / 2 - 1
01490           bounds ( i , 3:4 ) = bounds ( i , 3:4 ) - pw % pw_grid % npts (2) / 2 - 1
01491        ENDDO
01492 
01493        ALLOCATE ( send_tasks ( 0:pw % pw_grid % para % group_size -1,1:6 ), STAT=stat )
01494        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01495        ALLOCATE ( send_sizes ( 0:pw % pw_grid % para % group_size -1 ), STAT=stat )
01496        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01497        ALLOCATE ( send_disps ( 0:pw % pw_grid % para % group_size -1 ), STAT=stat )
01498        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01499        ALLOCATE ( recv_tasks ( 0:pw % pw_grid % para % group_size -1,1:6 ), STAT=stat )
01500        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01501        ALLOCATE ( recv_sizes ( 0:pw % pw_grid % para % group_size -1 ), STAT=stat )
01502        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01503        ALLOCATE ( recv_disps ( 0:pw % pw_grid % para % group_size -1 ), STAT=stat )
01504        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01505 
01506        send_tasks = 0
01507        send_tasks(:,1)=1
01508        send_tasks(:,2)=0
01509        send_tasks(:,3)=1
01510        send_tasks(:,4)=0
01511        send_tasks(:,5)=1
01512        send_tasks(:,6)=0
01513        send_sizes = 0
01514 
01515        recv_tasks = 0
01516        recv_tasks(:,1)=1
01517        recv_tasks(:,2)=0
01518        send_tasks(:,3)=1
01519        send_tasks(:,4)=0
01520        send_tasks(:,5)=1
01521        send_tasks(:,6)=0
01522        recv_sizes = 0
01523 
01524        my_rs_rank = rs % desc % my_pos
01525        my_pw_rank = pw % pw_grid % para % rs_mpo
01526 
01527        ! find the processors that should hold our data
01528        ! should be part of the rs grid type
01529        ! this is a loop over real ranks (i.e. the in-order cartesian ranks)
01530        ! do the recv and send tasks in two seperate loops which will
01531        ! load balance better for OpenMP with large numbers of MPI tasks
01532 
01533        ! this is the reverse of rs2pw: what were the sends are now the recvs
01534 
01535 !$omp parallel do default(none), &
01536 !$omp             private(coords,idir,pos,lb_send,ub_send), &
01537 !$omp             shared(rs,bounds,my_rs_rank,send_tasks,send_sizes,pw)
01538        DO i = 0, pw % pw_grid % para % group_size -1
01539 
01540           coords(:) = rs % desc % rank2coord ( : , rs % desc % real2virtual(i))
01541           !calculate the real rs grid points on each processor
01542           !coords is the part of the grid that rank i actually holds
01543           DO idir = 1,3
01544              pos (:) = get_limit ( rs % desc % npts ( idir ), rs % desc % group_dim ( idir ), coords(idir) )
01545              pos (:) = pos (:) - rs % desc % npts (idir) / 2 - 1
01546              lb_send(idir) = pos(1)
01547              ub_send(idir) = pos(2)
01548           ENDDO
01549 
01550           IF (ub_send(1) .LT.bounds(my_rs_rank,1)) CYCLE
01551           IF (lb_send(1) .GT.bounds(my_rs_rank,2)) CYCLE
01552           IF (ub_send(2) .LT.bounds(my_rs_rank,3)) CYCLE
01553           IF (lb_send(2) .GT.bounds(my_rs_rank,4)) CYCLE
01554 
01555           send_tasks(i,1)= MAX(lb_send(1),bounds(my_rs_rank,1))
01556           send_tasks(i,2)= MIN(ub_send(1),bounds(my_rs_rank,2))
01557           send_tasks(i,3)= MAX(lb_send(2),bounds(my_rs_rank,3))
01558           send_tasks(i,4)= MIN(ub_send(2),bounds(my_rs_rank,4))
01559           send_tasks(i,5)= lb_send(3)
01560           send_tasks(i,6)= ub_send(3)
01561           send_sizes(i)  = (send_tasks(i,2)-send_tasks( i ,1)+1)* &
01562                (send_tasks( i ,4)-send_tasks( i ,3)+1)*(send_tasks( i ,6)-send_tasks( i ,5)+1)
01563 
01564        ENDDO
01565 !$omp end parallel do
01566 
01567        coords(:) = rs % desc % rank2coord ( : , rs % desc % real2virtual(my_rs_rank))
01568        DO idir = 1,3
01569           pos (:) = get_limit ( rs % desc % npts ( idir ), rs % desc % group_dim ( idir ), coords(idir) )
01570           pos (:) = pos (:) - rs % desc % npts (idir) / 2 - 1
01571           lb_send(idir) = pos(1)
01572           ub_send(idir) = pos(2)
01573        ENDDO
01574 
01575        lb_recv(:) = lb_send(:)
01576        ub_recv(:) = ub_send(:)
01577 
01578 !$omp parallel do default(none), &
01579 !$omp             shared(pw,lb_send,ub_send,bounds,recv_tasks,recv_sizes)
01580        DO j = 0, pw % pw_grid % para % group_size - 1
01581 
01582           IF (ub_send(1) .LT.bounds(j,1)) CYCLE
01583           IF (lb_send(1) .GT.bounds(j,2)) CYCLE
01584           IF (ub_send(2) .LT.bounds(j,3)) CYCLE
01585           IF (lb_send(2) .GT.bounds(j,4)) CYCLE
01586 
01587           recv_tasks(j,1)= MAX(lb_send(1),bounds(j,1))
01588           recv_tasks(j,2)= MIN(ub_send(1),bounds(j,2))
01589           recv_tasks(j,3)= MAX(lb_send(2),bounds(j,3))
01590           recv_tasks(j,4)= MIN(ub_send(2),bounds(j,4))
01591           recv_tasks(j,5)= lb_send(3)
01592           recv_tasks(j,6)= ub_send(3)
01593           recv_sizes(j)  = (recv_tasks( j ,2)-recv_tasks( j ,1)+1)* &
01594                (recv_tasks( j ,4)-recv_tasks( j ,3)+1)*(recv_tasks( j ,6)-recv_tasks( j ,5)+1)
01595 
01596        ENDDO
01597 !$omp end parallel do
01598 
01599        send_disps(0)=0
01600        recv_disps(0)=0
01601        DO i = 1, pw % pw_grid % para % group_size - 1
01602           send_disps(i) = send_disps(i-1) + send_sizes(i-1)
01603           recv_disps(i) = recv_disps(i-1) + recv_sizes(i-1)
01604        ENDDO
01605 
01606        CPPrecondition(SUM(recv_sizes)==PRODUCT(ub_recv - lb_recv + 1),cp_failure_level,routineP,error,failure)
01607 
01608        ALLOCATE ( send_bufs ( 0:rs % desc % group_size - 1 ), STAT=stat )
01609        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01610        ALLOCATE ( recv_bufs ( 0:rs % desc % group_size - 1 ), STAT=stat )
01611        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01612 
01613        DO i = 0, rs % desc % group_size - 1
01614          IF ( send_sizes(i) .NE. 0 ) THEN
01615            ALLOCATE(send_bufs(i)%array (send_sizes(i)))
01616          ELSE
01617            NULLIFY(send_bufs(i)%array)
01618          END IF
01619          IF ( recv_sizes(i) .NE. 0 ) THEN
01620            ALLOCATE(recv_bufs(i)%array (recv_sizes(i)))
01621          ELSE
01622            NULLIFY(recv_bufs(i)%array)
01623          END IF
01624        END DO
01625 
01626        ALLOCATE (recv_reqs (0:rs % desc % group_size - 1), STAT=stat)
01627        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01628        recv_reqs = mp_request_null
01629 
01630        DO i = 0, rs % desc % group_size - 1
01631          IF ( recv_sizes(i) .NE. 0 ) THEN
01632            CALL mp_irecv(recv_bufs(i)%array, i, rs % desc % group, recv_reqs(i))
01633          END IF
01634        END DO
01635 
01636        ! do packing
01637 !$omp parallel do default(none), &
01638 !$omp             private(k,z,y,x), &
01639 !$omp             shared(pw,rs,send_tasks,send_bufs,send_disps)
01640        DO i = 0, rs % desc % group_size - 1
01641           k=0
01642           DO z = send_tasks(i,5) , send_tasks(i,6)
01643              DO y = send_tasks(i,3) , send_tasks(i,4)
01644                 DO x = send_tasks(i,1) , send_tasks(i,2)
01645                    k=k+1
01646                    send_bufs(i)%array(k)= pw % cr3d (x,y,z)
01647                 ENDDO
01648              ENDDO
01649           ENDDO
01650        ENDDO
01651 !$omp end parallel do
01652 
01653        ALLOCATE (send_reqs (0:rs % desc % group_size - 1), STAT=stat)
01654        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01655        send_reqs = mp_request_null
01656 
01657        DO i = 0, rs % desc % group_size - 1
01658          IF ( send_sizes(i) .NE. 0 ) THEN
01659            CALL mp_isend(send_bufs(i)%array, i, rs % desc % group, send_reqs(i))
01660          END IF
01661        END DO
01662 
01663        ! do unpacking
01664        ! no OMP here so we can unpack each message as it arrives
01665 
01666        DO i = 0, rs % desc % group_size - 1
01667           IF ( recv_sizes(i) .EQ. 0) CYCLE
01668 
01669           CALL mp_waitany(recv_reqs, completed)
01670           k=0
01671           DO z = recv_tasks(completed-1,5) , recv_tasks(completed-1,6)
01672              DO y = recv_tasks(completed-1,3) , recv_tasks(completed-1,4)
01673                 DO x = recv_tasks(completed-1,1) , recv_tasks(completed-1,2)
01674                    k=k+1
01675                    rs % r (x,y,z) = recv_bufs(completed-1)%array(k)
01676                 ENDDO
01677              ENDDO
01678           ENDDO
01679        ENDDO
01680 
01681        CALL mp_waitall(send_reqs)
01682 
01683        DEALLOCATE ( recv_reqs, STAT=stat)
01684        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01685        DEALLOCATE ( send_reqs, STAT=stat)
01686        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01687 
01688 
01689        DO i = 0, rs % desc % group_size - 1
01690          IF ( ASSOCIATED(send_bufs(i)%array ) ) THEN
01691            DEALLOCATE(send_bufs(i)%array)
01692          END IF
01693          IF ( ASSOCIATED(recv_bufs(i)%array ) ) THEN
01694            DEALLOCATE(recv_bufs(i)%array)
01695          END IF
01696        END DO
01697 
01698 
01699        DEALLOCATE ( send_bufs, STAT=stat)
01700        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01701        DEALLOCATE ( recv_bufs, STAT=stat)
01702        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01703        DEALLOCATE ( send_tasks, STAT=stat)
01704        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01705        DEALLOCATE ( send_sizes, STAT=stat)
01706        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01707        DEALLOCATE ( send_disps, STAT=stat)
01708        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01709        DEALLOCATE ( recv_tasks, STAT=stat)
01710        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01711        DEALLOCATE ( recv_sizes, STAT=stat)
01712        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01713        DEALLOCATE ( recv_disps, STAT=stat)
01714        CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01715 
01716        ! now pass wings around
01717        halo_swapped = .FALSE.
01718 
01719        DO idir = 1,3
01720 
01721           IF ( rs % desc % perd (idir) /= 1) THEN
01722 
01723              ALLOCATE ( dshifts ( 0:rs % desc % neighbours (idir ) ), STAT=stat )
01724              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01725              ALLOCATE ( ushifts ( 0:rs % desc % neighbours (idir ) ), STAT=stat )
01726              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01727              ushifts = 0
01728              dshifts = 0
01729 
01730              DO n_shifts = 1, rs % desc % neighbours(idir)
01731 
01732                 ! need to take into account the possible varying widths of neighbouring cells
01733                 ! ushifts and dshifts hold the real size of the neighbouring cells
01734 
01735                 position = MODULO ( rs % desc % virtual_group_coor (idir) - n_shifts, rs % desc % group_dim (idir))
01736                 neighbours = get_limit ( rs % desc % npts ( idir ), rs % desc % group_dim ( idir ), position )
01737                 dshifts(n_shifts) = dshifts(n_shifts-1) + ( neighbours (2) - neighbours (1) + 1 )
01738 
01739                 position = MODULO ( rs % desc % virtual_group_coor (idir) + n_shifts, rs % desc % group_dim (idir))
01740                 neighbours = get_limit ( rs % desc % npts ( idir ), rs % desc % group_dim ( idir ), position )
01741                 ushifts(n_shifts) = ushifts(n_shifts-1) + ( neighbours (2) - neighbours (1) + 1 )
01742 
01743                 ! The border data has to be send/received from the neighbors
01744                 ! First we calculate the source and destination processes for the shift
01745                 ! The first shift is "downwards"
01746 
01747                 CALL cart_shift (rs, idir, -1 * n_shifts, source_down, dest_down )
01748 
01749                 lb_send_down ( : ) = rs % lb_local ( : )
01750                 ub_send_down ( : ) = rs % ub_local ( : )
01751                 lb_recv_down ( : ) = rs % lb_local ( : )
01752                 ub_recv_down ( : ) = rs % ub_local ( : )
01753 
01754                 IF( dshifts ( n_shifts - 1 ) .LE. rs % desc % border ) THEN
01755                    lb_send_down ( idir ) = lb_send_down ( idir ) + rs % desc % border
01756                    ub_send_down ( idir ) = MIN (ub_send_down (idir) - rs% desc % border,&
01757                         lb_send_down ( idir ) + rs % desc % border - 1 - dshifts (n_shifts - 1))
01758 
01759                    lb_recv_down ( idir ) = ub_recv_down ( idir ) - rs % desc % border + 1 + ushifts ( n_shifts - 1 )
01760                    ub_recv_down ( idir ) = MIN (ub_recv_down ( idir ),&
01761                         ub_recv_down ( idir ) - rs%desc % border + ushifts ( n_shifts ))
01762                 ELSE
01763                    lb_send_down( idir ) = 0
01764                    ub_send_down( idir ) = -1
01765                    lb_recv_down( idir ) = 0
01766                    ub_recv_down( idir ) = -1
01767                 ENDIF
01768 
01769                 DO i=1,3
01770                    IF ( .NOT. ( halo_swapped(i) .OR. i .EQ. idir ) ) THEN
01771                       lb_send_down(i) = rs % lb_real(i)
01772                       ub_send_down(i) = rs % ub_real(i)
01773                       lb_recv_down(i) = rs % lb_real(i)
01774                       ub_recv_down(i) = rs % ub_real(i)
01775                    ENDIF
01776                 ENDDO
01777 
01778                 ! allocate the recv buffer
01779                 nn = PRODUCT ( ub_recv_down - lb_recv_down + 1 )
01780                 ALLOCATE ( recv_buf_3d_down ( lb_recv_down(1):ub_recv_down(1), &
01781                      lb_recv_down(2):ub_recv_down(2), lb_recv_down(3):ub_recv_down(3) ), STAT=stat )
01782                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01783 
01784                 ! recv buffer is now ready, so post the recieve
01785                 CALL mp_irecv (recv_buf_3d_down, source_down, rs % desc % group, req(1))
01786 
01787                 ! now allocate,pack and send the send buffer
01788                 nn = PRODUCT ( ub_send_down - lb_send_down + 1 )
01789                 ALLOCATE ( send_buf_3d_down ( lb_send_down(1):ub_send_down(1), &
01790                      lb_send_down(2):ub_send_down(2), lb_send_down(3):ub_send_down(3) ), STAT=stat )
01791                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01792 
01793 !$omp parallel default(none), &
01794 !$omp          private(lb,ub,my_id,num_threads), &
01795 !$omp          shared(send_buf_3d_down,rs,lb_send_down,ub_send_down)
01796 !$              num_threads = MIN(omp_get_max_threads(), ub_send_down(3)-lb_send_down(3)+1)
01797 !$              my_id = omp_get_thread_num()
01798                 IF (my_id < num_threads) THEN
01799                   lb = lb_send_down(3) + ((ub_send_down(3)-lb_send_down(3)+1)*my_id)/num_threads
01800                   ub = lb_send_down(3) + ((ub_send_down(3)-lb_send_down(3)+1)*(my_id+1))/num_threads - 1
01801 
01802                   send_buf_3d_down ( lb_send_down(1):ub_send_down(1) ,lb_send_down(2):ub_send_down(2), &
01803                                      lb:ub ) = rs % r ( lb_send_down(1):ub_send_down(1), &
01804                                      lb_send_down(2):ub_send_down(2), lb:ub )
01805                 END IF
01806 !$omp end parallel
01807 
01808                 CALL mp_isend ( send_buf_3d_down, dest_down, rs % desc % group, req(3))
01809 
01810                 ! Now for the other direction
01811 
01812                 CALL cart_shift ( rs, idir, n_shifts, source_up, dest_up )
01813 
01814                 lb_send_up ( : ) = rs % lb_local ( : )
01815                 ub_send_up ( : ) = rs % ub_local ( : )
01816                 lb_recv_up ( : ) = rs % lb_local ( : )
01817                 ub_recv_up ( : ) = rs % ub_local ( : )
01818 
01819                 IF( ushifts ( n_shifts - 1 ) .LE. rs % desc % border ) THEN
01820                    ub_send_up ( idir ) = ub_send_up ( idir ) - rs % desc % border
01821                    lb_send_up ( idir ) = MAX (lb_send_up( idir) + rs % desc % border,&
01822                         ub_send_up ( idir ) - rs % desc % border + 1 + ushifts (n_shifts - 1) )
01823 
01824                    ub_recv_up ( idir ) = lb_recv_up ( idir ) + rs % desc % border - 1 - dshifts ( n_shifts - 1 )
01825                    lb_recv_up ( idir ) = MAX (lb_recv_up ( idir ),&
01826                         lb_recv_up ( idir ) + rs % desc % border - dshifts ( n_shifts ))
01827                 ELSE
01828                    lb_send_up( idir ) = 0
01829                    ub_send_up( idir ) = -1
01830                    lb_recv_up( idir ) = 0
01831                    ub_recv_up( idir ) = -1
01832                 ENDIF
01833 
01834                 DO i=1,3
01835                    IF ( .NOT. ( halo_swapped(i) .OR. i .EQ. idir ) ) THEN
01836                       lb_send_up(i) = rs % lb_real(i)
01837                       ub_send_up(i) = rs % ub_real(i)
01838                       lb_recv_up(i) = rs % lb_real(i)
01839                       ub_recv_up(i) = rs % ub_real(i)
01840                    ENDIF
01841                 ENDDO
01842 
01843                 ! allocate the recv buffer
01844                 nn = PRODUCT ( ub_recv_up - lb_recv_up + 1 )
01845                 ALLOCATE ( recv_buf_3d_up ( lb_recv_up(1):ub_recv_up(1), &
01846                      lb_recv_up(2):ub_recv_up(2), lb_recv_up(3):ub_recv_up(3) ), STAT=stat )
01847                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01848 
01849                 ! recv buffer is now ready, so post the recieve
01850 
01851                 CALL mp_irecv (recv_buf_3d_up, source_up, rs % desc % group, req(2))
01852 
01853                 ! now allocate,pack and send the send buffer
01854                 nn = PRODUCT ( ub_send_up - lb_send_up + 1 )
01855                 ALLOCATE ( send_buf_3d_up ( lb_send_up(1):ub_send_up(1), &
01856                      lb_send_up(2):ub_send_up(2), lb_send_up(3):ub_send_up(3) ), STAT=stat )
01857                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01858 
01859 !$omp parallel default(none), &
01860 !$omp          private(lb,ub,my_id,num_threads), &
01861 !$omp          shared(send_buf_3d_up,rs,lb_send_up,ub_send_up)
01862 !$              num_threads = MIN(omp_get_max_threads(), ub_send_up(3)-lb_send_up(3)+1)
01863 !$              my_id = omp_get_thread_num()
01864                 IF (my_id < num_threads) THEN
01865                   lb = lb_send_up(3) + ((ub_send_up(3)-lb_send_up(3)+1)*my_id)/num_threads
01866                   ub = lb_send_up(3) + ((ub_send_up(3)-lb_send_up(3)+1)*(my_id+1))/num_threads - 1
01867 
01868                   send_buf_3d_up ( lb_send_up(1):ub_send_up(1), lb_send_up(2):ub_send_up(2), &
01869                                    lb:ub ) = rs % r ( lb_send_up(1):ub_send_up(1), &
01870                                    lb_send_up(2):ub_send_up(2), lb:ub )
01871                 END IF
01872 !$omp end parallel
01873 
01874                 CALL mp_isend ( send_buf_3d_up, dest_up, rs % desc % group, req(4) )
01875 
01876                 ! wait for a recv to complete, then we can unpack
01877 
01878                 DO i=1,2
01879 
01880                    CALL mp_waitany (req(1:2),completed)
01881 
01882                    IF ( completed .EQ. 1 ) THEN
01883 
01884                       ! only some procs may need later shifts
01885                       IF(ub_recv_down(idir).GE.lb_recv_down(idir))THEN
01886 
01887                          ! Add the data to the RS Grid
01888 !$omp parallel default(none), &
01889 !$omp          private(lb,ub,my_id,num_threads), &
01890 !$omp          shared(recv_buf_3d_down,rs,lb_recv_down,ub_recv_down)
01891 !$                       num_threads = MIN(omp_get_max_threads(), ub_recv_down(3)-lb_recv_down(3)+1)
01892 !$                       my_id = omp_get_thread_num()
01893                          IF (my_id < num_threads) THEN
01894                            lb = lb_recv_down(3) + ((ub_recv_down(3)-lb_recv_down(3)+1)*my_id)/num_threads
01895                            ub = lb_recv_down(3) + ((ub_recv_down(3)-lb_recv_down(3)+1)*(my_id+1))/num_threads - 1
01896 
01897                            rs % r ( lb_recv_down(1):ub_recv_down(1), lb_recv_down(2):ub_recv_down(2), &
01898                                 lb:ub ) =  recv_buf_3d_down ( :, :, lb:ub )
01899                          END IF
01900 !$omp end parallel
01901                       END IF
01902 
01903                       DEALLOCATE ( recv_buf_3d_down, STAT=stat )
01904                       CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01905                    ELSE
01906 
01907                       ! only some procs may need later shifts
01908                       IF(ub_recv_up(idir).GE.lb_recv_up(idir))THEN
01909 
01910                          ! Add the data to the RS Grid
01911 !$omp parallel default(none), &
01912 !$omp          private(lb,ub,my_id,num_threads), &
01913 !$omp          shared(recv_buf_3d_up,rs,lb_recv_up,ub_recv_up)
01914 !$                       num_threads = MIN(omp_get_max_threads(), ub_recv_up(3)-lb_recv_up(3)+1)
01915 !$                       my_id = omp_get_thread_num()
01916                          IF (my_id < num_threads) THEN
01917                            lb = lb_recv_up(3) + ((ub_recv_up(3)-lb_recv_up(3)+1)*my_id)/num_threads
01918                            ub = lb_recv_up(3) + ((ub_recv_up(3)-lb_recv_up(3)+1)*(my_id+1))/num_threads - 1
01919 
01920                            rs % r ( lb_recv_up(1):ub_recv_up(1), lb_recv_up(2):ub_recv_up(2), &
01921                                 lb:ub ) = recv_buf_3d_up ( :, :, lb:ub )
01922                          END IF
01923 !$omp end parallel
01924                       END IF
01925 
01926                       DEALLOCATE ( recv_buf_3d_up, STAT=stat )
01927                       CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01928                    END IF
01929                 END DO
01930 
01931                 CALL mp_waitall (req(3:4))
01932 
01933                 DEALLOCATE ( send_buf_3d_down, STAT=stat )
01934                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01935                 DEALLOCATE ( send_buf_3d_up, STAT=stat )
01936                 CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01937              END DO
01938 
01939              DEALLOCATE ( ushifts, STAT=stat)
01940              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01941              DEALLOCATE ( dshifts, STAT=stat)
01942              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
01943           END IF
01944 
01945           halo_swapped (idir) = .TRUE.
01946 
01947        END DO
01948     END IF
01949 
01950   END SUBROUTINE rs_pw_transfer_distributed
01951 
01952 ! *****************************************************************************
01958   SUBROUTINE rs_grid_zero ( rs )
01959 
01960     TYPE(realspace_grid_type), POINTER       :: rs
01961 
01962     CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_zero', 
01963       routineP = moduleN//':'//routineN
01964 
01965     INTEGER                                  :: handle, i, j, k, l(3), u(3)
01966 
01967     CALL timeset(routineN,handle)
01968     l(1) = LBOUND( rs % r, 1 );l(2) = LBOUND( rs % r, 2 );l(3) = LBOUND( rs % r, 3 )
01969     u(1) = UBOUND( rs % r, 1 );u(2) = UBOUND( rs % r, 2 );u(3) = UBOUND( rs % r, 3 )
01970 !$omp parallel do default(none) __COLLAPSE3 &
01971 !$omp             private(i,j,k) &
01972 !$omp             shared(rs,l,u)
01973     DO k = l(3),u(3)
01974     DO j = l(2),u(2)
01975     DO i = l(1),u(1)
01976        rs % r ( i, j, k ) = 0.0_dp
01977     ENDDO
01978     ENDDO
01979     ENDDO
01980 !$omp end parallel do
01981     CALL timestop(handle)
01982 
01983   END SUBROUTINE rs_grid_zero
01984 
01985 ! *****************************************************************************
01991 SUBROUTINE rs_grid_mult_and_add ( rs1 ,rs2, rs3, scalar )
01992 
01993     TYPE(realspace_grid_type), POINTER       :: rs1, rs2, rs3
01994     REAL(dp), INTENT(IN)                     :: scalar
01995 
01996     CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_mult_and_add', 
01997       routineP = moduleN//':'//routineN
01998 
01999     INTEGER                                  :: handle, i, j, k, l(3), u(3)
02000 
02001 !-----------------------------------------------------------------------------!
02002 
02003    CALL timeset(routineN,handle)
02004    IF(scalar.NE.0.0_dp) THEN
02005       l(1) = LBOUND( rs1 % r, 1 );l(2) = LBOUND( rs1 % r, 2 );l(3) = LBOUND( rs1 % r, 3 )
02006       u(1) = UBOUND( rs1 % r, 1 );u(2) = UBOUND( rs1 % r, 2 );u(3) = UBOUND( rs1 % r, 3 )
02007 !$omp parallel do default(none) __COLLAPSE3 &
02008 !$omp             private(i,j,k) &
02009 !$omp             shared(rs1,rs2,rs3,scalar,l,u)
02010       DO k = l(3),u(3)
02011       DO j = l(2),u(2)
02012       DO i = l(1),u(1)
02013          rs1 % r ( i, j, k ) = rs1 % r ( i, j, k ) + scalar * rs2 % r ( i, j, k ) * rs3 % r ( i, j, k )
02014       ENDDO
02015       ENDDO
02016       ENDDO
02017 !$omp end parallel do
02018    ENDIF
02019    CALL timestop(handle)
02020  END SUBROUTINE rs_grid_mult_and_add
02021 
02022 ! *****************************************************************************
02029   SUBROUTINE rs_grid_set_box ( pw_grid, rs, error )
02030 
02031     TYPE(pw_grid_type), POINTER              :: pw_grid
02032     TYPE(realspace_grid_type), POINTER       :: rs
02033     TYPE(cp_error_type), INTENT(inout)       :: error
02034 
02035     CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_set_box', 
02036       routineP = moduleN//':'//routineN
02037 
02038     LOGICAL                                  :: failure
02039 
02040     CPPrecondition(ASSOCIATED(pw_grid),cp_failure_level,routineP,error,failure)
02041     CPPrecondition(ASSOCIATED(rs),cp_failure_level,routineP,error,failure)
02042     CPPrecondition(rs%desc % grid_id==pw_grid%id_nr,cp_failure_level,routineP,error,failure)
02043     rs % desc % dh = pw_grid%dh
02044     rs % desc % dh_inv = pw_grid%dh_inv
02045 
02046   END SUBROUTINE rs_grid_set_box
02047 
02048 ! *****************************************************************************
02057   SUBROUTINE rs_grid_retain(rs_grid, error)
02058     TYPE(realspace_grid_type), POINTER       :: rs_grid
02059     TYPE(cp_error_type), INTENT(inout)       :: error
02060 
02061     CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_retain', 
02062       routineP = moduleN//':'//routineN
02063 
02064     LOGICAL                                  :: failure
02065 
02066     failure=.FALSE.
02067 
02068     CPPrecondition(ASSOCIATED(rs_grid),cp_failure_level,routineP,error,failure)
02069     IF (.NOT. failure) THEN
02070        CPPreconditionNoFail(rs_grid%ref_count>0,cp_failure_level,routineP,error)
02071        rs_grid%ref_count=rs_grid%ref_count+1
02072     END IF
02073   END SUBROUTINE rs_grid_retain
02074 
02075 ! *****************************************************************************
02084   SUBROUTINE rs_grid_retain_descriptor(rs_desc, error)
02085     TYPE(realspace_grid_desc_type), POINTER  :: rs_desc
02086     TYPE(cp_error_type), INTENT(inout)       :: error
02087 
02088     CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_retain_descriptor', 
02089       routineP = moduleN//':'//routineN
02090 
02091     LOGICAL                                  :: failure
02092 
02093     failure=.FALSE.
02094 
02095     CPPrecondition(ASSOCIATED(rs_desc),cp_failure_level,routineP,error,failure)
02096     IF (.NOT. failure) THEN
02097        CPPreconditionNoFail(rs_desc%ref_count>0,cp_failure_level,routineP,error)
02098        rs_desc%ref_count=rs_desc%ref_count+1
02099     END IF
02100   END SUBROUTINE rs_grid_retain_descriptor
02101 
02102 ! *****************************************************************************
02111   SUBROUTINE rs_grid_release(rs_grid,error)
02112     TYPE(realspace_grid_type), POINTER       :: rs_grid
02113     TYPE(cp_error_type), INTENT(inout)       :: error
02114 
02115     CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_release', 
02116       routineP = moduleN//':'//routineN
02117 
02118     INTEGER                                  :: stat
02119     LOGICAL                                  :: failure
02120 
02121     failure=.FALSE.
02122 
02123     IF (ASSOCIATED(rs_grid)) THEN
02124        CPPreconditionNoFail(rs_grid%ref_count>0,cp_failure_level,routineP,error)
02125        rs_grid%ref_count=rs_grid%ref_count-1
02126        IF (rs_grid%ref_count==0) THEN
02127 
02128           CALL rs_grid_release_descriptor(rs_grid % desc, error=error)
02129 
02130           allocated_rs_grid_count=allocated_rs_grid_count-1
02131           DEALLOCATE ( rs_grid % r, STAT=stat )
02132           CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02133           DEALLOCATE ( rs_grid % px , STAT=stat )
02134           CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02135           DEALLOCATE ( rs_grid % py , STAT=stat )
02136           CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02137           DEALLOCATE ( rs_grid % pz , STAT=stat )
02138           CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02139           DEALLOCATE(rs_grid, stat=stat)
02140           CPPostconditionNoFail(stat==0,cp_warning_level,routineP,error)
02141           NULLIFY(rs_grid)
02142        END IF
02143     END IF
02144   END SUBROUTINE rs_grid_release
02145 
02146 ! *****************************************************************************
02155   SUBROUTINE rs_grid_release_descriptor(rs_desc,error)
02156     TYPE(realspace_grid_desc_type), POINTER  :: rs_desc
02157     TYPE(cp_error_type), INTENT(inout)       :: error
02158 
02159     CHARACTER(len=*), PARAMETER :: routineN = 'rs_grid_release_descriptor', 
02160       routineP = moduleN//':'//routineN
02161 
02162     INTEGER                                  :: stat
02163     LOGICAL                                  :: failure
02164 
02165     failure=.FALSE.
02166 
02167     IF (ASSOCIATED(rs_desc)) THEN
02168        CPPreconditionNoFail(rs_desc%ref_count>0,cp_failure_level,routineP,error)
02169        rs_desc%ref_count=rs_desc%ref_count-1
02170        IF (rs_desc%ref_count==0) THEN
02171 
02172           CALL pw_grid_release(rs_desc%pw,error=error)
02173 
02174           IF ( rs_desc % parallel ) THEN
02175              ! release the group communicator
02176              CALL mp_comm_free ( rs_desc % group )
02177 
02178              DEALLOCATE ( rs_desc % virtual2real, STAT=stat )
02179              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02180              DEALLOCATE ( rs_desc % real2virtual, STAT=stat )
02181              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02182           END IF
02183 
02184           IF (rs_desc % distributed) THEN
02185              DEALLOCATE ( rs_desc % rank2coord , STAT=stat )
02186              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02187              DEALLOCATE ( rs_desc % coord2rank , STAT=stat )
02188              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02189              DEALLOCATE ( rs_desc % lb_global , STAT=stat )
02190              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02191              DEALLOCATE ( rs_desc % ub_global , STAT=stat )
02192              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02193              DEALLOCATE ( rs_desc % x2coord , STAT=stat )
02194              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02195              DEALLOCATE ( rs_desc % y2coord , STAT=stat )
02196              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02197              DEALLOCATE ( rs_desc % z2coord , STAT=stat )
02198              CPPostcondition(stat==0,cp_failure_level,routineP,error,failure)
02199           ENDIF
02200 
02201           DEALLOCATE(rs_desc, stat=stat)
02202           CPPostconditionNoFail(stat==0,cp_warning_level,routineP,error)
02203           NULLIFY(rs_desc)
02204        END IF
02205     END IF
02206   END SUBROUTINE rs_grid_release_descriptor
02207 
02208 ! *****************************************************************************
02215   SUBROUTINE cart_shift ( rs_grid, dir, disp, source, dest )
02216 
02217     TYPE(realspace_grid_type), POINTER       :: rs_grid
02218     INTEGER, INTENT(IN)                      :: dir, disp
02219     INTEGER, INTENT(OUT)                     :: source, dest
02220 
02221     CHARACTER(len=*), PARAMETER :: routineN = 'cart_shift', 
02222       routineP = moduleN//':'//routineN
02223 
02224     INTEGER, DIMENSION(3)                    :: shift_coords
02225 
02226     shift_coords = rs_grid % desc % virtual_group_coor
02227     shift_coords(dir) = MODULO ( shift_coords(dir) + disp , rs_grid % desc % group_dim(dir) )
02228     dest = rs_grid % desc % virtual2real(rs_grid % desc % coord2rank(shift_coords(1), shift_coords(2), shift_coords(3)))
02229     shift_coords = rs_grid % desc % virtual_group_coor
02230     shift_coords(dir) = MODULO ( shift_coords(dir) - disp , rs_grid % desc % group_dim(dir) )
02231     source = rs_grid % desc % virtual2real(rs_grid % desc % coord2rank(shift_coords(1), shift_coords(2), shift_coords(3)))
02232 
02233   END SUBROUTINE
02234 
02235 ! *****************************************************************************
02241   FUNCTION rs_grid_max_ngpts(desc) RESULT(max_ngpts)
02242     TYPE(realspace_grid_desc_type), POINTER  :: desc
02243     INTEGER                                  :: max_ngpts
02244 
02245     INTEGER                                  :: i
02246     INTEGER, DIMENSION(3)                    :: lb, ub
02247 
02248     max_ngpts = 0
02249     IF ((desc % pw % para % mode == PW_MODE_LOCAL) .OR. &
02250         ( ALL (desc % group_dim == 1 ))) THEN
02251       max_ngpts = PRODUCT ( desc % npts )
02252     ELSE
02253       DO i=0, desc%group_size-1
02254           lb = desc % lb_global ( :, i )
02255           ub = desc % ub_global ( :, i )
02256           lb = lb - desc % border * ( 1 - desc % perd )
02257           ub = ub + desc % border * ( 1 - desc % perd )
02258           max_ngpts = MAX( max_ngpts, PRODUCT ( ub - lb + 1 ) )
02259       END DO
02260     END IF
02261 
02262   END FUNCTION rs_grid_max_ngpts
02263 
02264 
02265 END MODULE realspace_grid_types