Есть отличная удаленная работа для php+codeception+jenkins+allure+docker спецов. 100% remote! Присоединиться к проекту

Проблема с многопоточностью при подсчете файлов в указанных папках

execution
parallel
java
Теги: #<Tag:0x00007f7b70b16848> #<Tag:0x00007f7b70b16708> #<Tag:0x00007f7b70b165c8>

(Roy Obenon) #1

Создал программку которая считает кол-во файлов в указанных папках и выводит результат. Если не задействовать моногпоточность то выводит всё норм, как только пробую многопот то выводит одни 0.

public class CountFiles extends File implements Runnable {
private static FileFilter forFolder;
private static FileFilter forFile;
private static int countFilesToPrint;
private static List<String> list;
private static String pathname;
public static CountFiles countFiles;


public CountFiles(String pathname) {
    super(pathname);
}

protected static void execute() {
    countFilesToPrint = 0;
    countFiles = new CountFiles(pathname);
    if (!countFiles.isDirectory()) {
        System.out.println(countFiles + " it's not a directory!");
        return;
    }

    if (!countFiles.exists()){
        System.out.println(countFiles + " is not exists!");
        return;
    }

    forFolder = new FileFilter() {

        @Override
        public boolean accept(File pathname) {
        return pathname.isDirectory();
        }
    };
    forFile = new FileFilter() {
        @Override
        public boolean accept(File pathname) {
            return pathname.isFile();
        }
    };

    executeFolder(countFiles);
}

protected static void executeFolder(File nameFolder){
    File[] file = nameFolder.listFiles(forFile);
    File[] folder = nameFolder.listFiles(forFolder);
    countFilesToPrint = countFilesToPrint + file.length;

    for (File file2 :  folder){
        executeFolder(file2);
    }

}

@Override
public void run() {
    execute();
}

public static void main(String[] args) {
    list = Arrays.asList("D:\\Java", "D:\\Load_Tools", "D:\\data");
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < list.size(); i++){
        pathname = list.get(i);
        executorService.execute(new CountFiles(pathname));
        System.out.println(list.get(i) + " - " + countFilesToPrint);
    }
    executorService.shutdown();

    }
}

(Artur Korobeynyk) #2

Снова таки, я не джава программист, но вы запускаете отдельный поток, в котором читаете файлы а в основном процессе тут же выводите список файлов, до того как дочерний процесс отработал. Если запустить на пентиум 1 с 400 мгц частоты, может и сработает =)
Вобщем, скорее всего причина в этом.


(Eugene Tkachenko) #3

Примерно тоже думаю, что написал @arturk
Нужно сначало заиндексить все файлы, создать что-то типа очереди и потом в сейф моде проходиться по очереди.
Посмотрите ответы здесь, есть дельные советы на счет многопоточности: http://javatalks.ru/topics/26341?page=1#131144


(Roy Obenon) #4

А нужно чтобы подсчет сразу шел по всем папкам одновременно


(Artur Korobeynyk) #5

Воспользуйтесь семафорами, мьютексами и т.д.


(Roy Obenon) #6

или упростить и поставить задержку. При семафорах все равно будет ожидание пока первый отработает


(Artur Korobeynyk) #7

Ну и ещё вы понимаете ведь что каждый ваш поток при запуске будет обнулять текущий счетчик файлов в такой реализации? :wink:


(Eugene Tkachenko) #8

Вы правильно сказали, по-этому нужно создать какую-то темповую очередь из фолдеров и по ней ходить потоками, что бы потоки не ходили и не обнуляли уже пройденный фолдер другим потоком.


(Максим Таран) #9

Ага, иcпользуйте AtomicInteger если нужно общее количество файлов и инкрементируйте его из каждого потока. В данном случае объявлять его как volatile, вроде, не требуется. Просто static.


(Sergey Korol) #10

По сложному пути пошли. Вот quick sample на Java 8:

public static long walkThrough(final String path) {
    try (final Stream<Path> pathStream = Files.walk(Paths.get(path))) {
        return pathStream
                .filter(Files::isRegularFile)
                .count();
    } catch (IOException ignored) {
        return 0;
    }
}

Вызов:

Map<String, Long> items = Arrays.asList("path1", "path2", "pathN")
            .stream()
            .distinct()
            .collect(toMap(Function.identity(), HostClass::walkThrough));

Тут же можете поиграться с parallel() / parallelStream() и замерить performance. Хотя, очень сомневаюсь, что выиграете в производительности на маленьких выборках. Даже наоборот, скорее потеряете.

Если же надо просто посчитать общее число файлов:

long items = Arrays.asList("path1", "path2", "pathN")
        .stream()
        .distinct()
        .mapToLong(HostClass::walkThrough)
        .sum();

П.С. В случае со стримами, вам вряд ли удастся ошибиться с shared объектами.


(Максим Таран) #11

Единственно, что действительно это на java 8. Хотя с учётом того, что поддержка 7-ой прекращена пора всем перебираться. :smile:


(Roy Obenon) #12

нужно реализовывать самому?


(Sergey Korol) #13

Welcome to Oracle Javadocs world.


(Александр Таранков) #14

А нормально, что все потоки будут писать/читать в одну статическую переменную? Не лучше ли было избавиться от статики для начала?


(Максим Таран) #15

Это нормально. Главное синхронизировать :smile: Хотя можно было, сначала внутри всех потоков посчитать отдельно, а потом сложить.


(Sergey Korol) #16

Вот тут то и срабатывает принципиально ограничение стримов, которые не надут вам использовать не final / effectively final объекты из-вне в лямбдах. Это сделано как раз с той точки зрения, чтобы потоки не имели возможности изменять состояние mutable объектов, что привело бы к непредсказуемым последствиям при параллельном экзекьюшене. Конечно технически вы все-таки сможете изменять shared state какому-нибудь аккумулятору внутри объекта. Но это будут проблемы самого алгоритма, а не неверной работы потоков, в случае fail результатов. В Java 8 весь effort по многопоточности стримов реализован internally через fork/join framework. Но сами создатели предупреждают, что не следует бездумно включать опцию parallelStream(), т.к. при определенных условиях затраты на подготовку могут быть выше, нежели при последовательном запуске. Потому, в случае параллелизации желательно всегда делать замеры через JMH, проводить предварительную оптимизацию (по типу boxing / unboxing операций), избегать операций, зависящих от порядка (limit / findFirst), ordered коллекций, маленьких выборок т.п.


(Roy Obenon) #17

Я все таки добил (сделал чтобы одновременно считалось в каждом потоке)

public class CountFiles implements Runnable {
public FileFilter forFolder;
public FileFilter forFile;
private int countFilesToPrint;
private static List<String> list;
private String pathname;

public CountFiles(String name){
    this.pathname = name;
}

public int getCountFilesToPrint(){
    return countFilesToPrint;
}
public String getPath(){
    return pathname;
}

protected  void execute() {
    File file = new File(getPath());
    if (!file.isDirectory()) {
        System.out.println(file + " it's not a directory!");
        return;
    }

    if (!file.exists()){
        System.out.println(file + " is not exists!");
        return;
    }

    forFolder = new FileFilter() {

        @Override
        public boolean accept(File pathname) {
            return pathname.isDirectory();
        }
    };
    forFile = new FileFilter() {
        @Override
        public boolean accept(File pathname) {
            return pathname.isFile();
        }
    };

    this.executeFolder(file);
}

protected void executeFolder(File nameFolder){
    File[] file = nameFolder.listFiles(forFile);
    File[] folder = nameFolder.listFiles(forFolder);
    this.countFilesToPrint = this.countFilesToPrint + file.length;

    for (File file2 :  folder){
        executeFolder(file2);
    }
}

@Override
public void run() {
    execute();
    System.out.println(getPath() + " - " + getCountFilesToPrint());
}


public static void main(String[] args) throws InterruptedException {

    list = Arrays.asList("D:\\Load_Tools", "D:\\data", "D:\\Java");
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < list.size(); i++){
        executorService.execute(new CountFiles(list.get(i)));
    }
    executorService.shutdown();
    }
}

(Максим Таран) #18

Вы обращаетесь к переменной из нескольких потоков не пометив её как volatile. Вы проверили, оно точно считает?


(Roy Obenon) #19

Точно, я обращаюсь к переменной (а объекты то разные, каждый поток создает свой объект и непосредственно к полям этого объекта я и обращаюсь)


(Максим Таран) #20

ААА. Так он не суммарно считает. Да. понял. :smile: