Пишем буферизованный ввод-вывод с испоьзование планировщика ОС.

В моём первом посте про самопальную litenkjerne я обещал сделать буферизованный ввод-вывод в UART. Собственно, под Новый Год выполняю обещанное. Да, на этом примере можно реализовать и всё остальное, скажем, ADC/DAC, SPI, I2C и прочее.

Идея такова: есть кольцевой буфер (в примере он 16 байт, я отлаживал на 8, тоже работает без проблем), нить пишет в него данные (или читает), а обработчик прерывания забирает их оттуда (или кладёт). Всё хорошо, но фишка в том, что надо обычно вывести больше, чем размер кольцевого буфера, и тогда надо думать головой. Чтоб лишний раз головой не думать, надо как-то разрулить ситуацию. Разрулим её с помощью планировщика. Если кольцевой буфер кончился, а у нас остались данные, нужно подвесить нить. А когда отработает обработчик прерывания, он её возобновит. Посмотрим, как просто это сделано:
Вот обработчик прерывания отправки в UART:

#if defined(__IAR_SYSTEMS_ICC__)
#pragma vector = 19
#endif
INTERRUPT void uart_tx_isr (void)
#if defined(__RCSTM8__)
interrupt 17
#endif
{
  if(uart_tx_tail != uart_tx_head)
  {
    UART1->DR = uart_tx_bfr[uart_tx_tail];
    uart_tx_tail ++;
    if(uart_tx_tail >= UART_TX_SIZE) uart_tx_tail = 0;
  }
  else {
    UART1->CR2 &= ~UART1_CR2_TIEN; //disable tx reg empty interrupt
    if(uart_sleep_thread_tx) {
      krn_thread_unlock(uart_sleep_thread_tx);
      krn_thread_move(uart_sleep_thread_tx, krn_thread_current);
      uart_sleep_thread_tx = 0;
      //krn_dispatch(); //uncomment for extra hardness
    }
  }
}

В конце у него разблокировка нити и перепланирование её в очереди. Закомментированная строчка позволяет немедленно передать управление разблокированной нити.
Вот собственно, функция, в которой реализована запись:

void uart_write(char *bfr, int len)
{
  CRITICAL_STORE;
  int l;
  krn_mutex_lock(&uart_mutex_tx);
  while(len)
  {
    CRITICAL_START();
    l = uart_write_bfr(bfr, len);
    uart_start_tx();
    len -= l;
    if(len == 0) {
      CRITICAL_END();
      break;
    }
    bfr += l;
    uart_sleep_thread_tx = krn_thread_current;
    krn_thread_lock(uart_sleep_thread_tx);
    krn_dispatch();
    CRITICAL_END();
  }
  krn_mutex_unlock(&uart_mutex_tx);
}

В конце видна строчка, где она блокирует сама себя krn_thread_lock(uart_sleep_thread_tx); и запускает раунд планировщика krn_dispatch();.
Ну и, собственно, как оно используется:

//*
kout_u32h(g_str, f);
kout_uart("f=");
kout_uart(g_str);
kout_uart(" sec=");
kout_uart(kout_u32d(g_str + 12, krn_timer_cnt / KRN_FREQ));
kout_uart("\n");
//mode 1 demonstrates use of sliding ring buffer
//*
if(uart_rx_ev_get()) {
  j = uart_rx_get_len();
  uart_peek( g_str, j);
  kout_uart(" bfr=");
  uart_write(g_str, j);
  kout_uart("\n");
  uart_rx_ev_clr();
}
//*/

Тут что-то выводится, и зодно, если что-то пришло через терминал, выводится дамп кольцевого буфера, который читается в режиме peek (данные остаются в нём — в итоге как FIFO очередь)
В общем, думаю, понятно, что вытесняющая ОС позволяет легко вытворять довольно сложные вещи.
Если будете пробовать, можете убирать sleep'ы, будет динамичнее, но вывода входного буфера не увидете — пролетит слишком быстро.

UPD Добавлен буферизованный вывод для HD44780
  • +1
  • 31 декабря 2012, 20:24
  • scaldov

Комментарии (13)

RSS свернуть / развернуть
Не имею опыта работы с FreeRTOS и не в курсе знач5ения некоторых макросов. Попытаюсь пофантазировать. Имеем следующую систему:
— один низкоприоритетный поток периодически/по запросу выводит какую-то объемную информацию ~1.1 буффера…
— один высокоприоритетный поток, по редкому внешнему событию производит критическую по времени, относительно длительную ~1-5мс, обработку и выводит в конце краткий отчет по ней.
— UART настроен на 115200, дабы максимально сократить время ответа.
Прочие потоки не рассматриваем, считаем что они не пересекаются с рассматриваемой системой.
Идем по шагам.
1) происходить обработка низкоприоритетной задачи, которая вызывает запись в UART;
2) первый вызов полностью заполняет буффер и разрешает передачу;
3) в момент выполнения bfr += l; происходит высокоприоритетное событие, и происходит вытеснение низкоприоритетной задачи;
4) высокоприоритетная задача выполняет свои длительные вычисления, в это время в фоне происходит отправка данных;
5) по завершении отправки UART запрещается, и поскольку нет ожидающего потока (см. шаг 3), дополнительных обработок не происходит;
6) высокоприоритетный поток инициирует передачу по UART…
7) при попытке блокировать krn_mutex_lock(&uart_mutex_tx);происходит повышение приоритета и переключение низкоприоритетной задачи;
8) происходит установка «ожидающего потока»;
9) задача блокируется, навсегда (см. шаг 5).

P.S.: Не понятно назначение CRITICAL_STORE, что это и зачем?
0
CRITICAL_STORE это место где сохраняется флаги процессора.
потом прерывания запрещаются и то, что у вас расписано с шага 3) не происходит. В общем, всё отработано, как часы.
0
а, да, запрещаются они в CRITICAL_START
0
Тогда ясно. Смутил вызов krn_dispatch(); внутри критической секции. В моём представлении при её вызове управление передается, а не только планируется к передаче.
0
так и есть, передаётся. Но навсегда ничего не блокируется.
0
Да. Заворот всего цикла в krn_mutex_lock(&uart_mutex_tx); уж слишком агрессивное поведение. Если низкоприоритетный поток отправить на запись большой блок данных, то высокоприоритетный будет обязан ждать завершения всей операции записи. Да, это требуется для сохранения целостности всего блока данных, но как-то не по себе.

Снятие блокировки только при опустошении буфера тоже не самый производительный вариант. Если у нас не поместилось всего 1-2 символа, что, ждать завершения передачи всех 128 байт из буфера? Впрочем постоянно дергать планировщик, по каждому чиху, тоже плохо.
0
есть такая проблема. Но это уже проблема программиста, т.к. впихивать в такую мелкую ОСь IO scheduling — это уж сликом. Просто просчитайте примерно ввод-вывод во всех потоках. Прикинте скорость обмена и выведите из частоты, длины обмена и скорости канала длину кольцевого буфера. Чтоб ситуации блокировки высокоприоритетной нити из-за ввода-вывода не было.
0
Такое поведение можно расценивать как перекладывание ответственности с системного программиста на прикладного. А при этом ещё прикладной программист обязан знать, как там система работает внутрях, что тоже не хорошо.
0
неправда. вы представляете что такое приоретизация по IO? Это очень сложные алгоритмы. Например, как раздать приоритет? На основании мгновенного объёма данных или среднего? Среднего за какое время? И т.д. И кстати даже на десктопе, помнится, система всасывала, когда какой-нибудь процесс начинал насиловать винт. В XP точно так было, в 7ке не проверял, а в линуксе сделали — даже можно кино смотреть, когда винт мастурбируется.
0
так что, сэкономив ресурсов, сделаем проще. А программист не должен знать, что происходит внутри. Я написал, что он должен прикинуть, как работают его нити, и рассчитать размер кольцевого буфера.
0
Сложно это. Делаю int putchar(int) и int getchar(void). Если ресурс занят, то код возврата отрицательное число, иначе получаем в старшем байте ноль, а в младшем — отправленное в FIFO у putchar, ну и принятое из непустого FIFO приема по getchar. При возврате ошибки уходим (обычно у меня нечто а-ля Protothreads) заниматься более полезными делами. Отправку блока делаю со статическим указателем и декрементируемым счетчиком. При ненулевом счетчике попытка заправить блок на отправку даст код ошибки, т.е. имеем стартовалку, а подкачка живёт в обработчике прерывания передатчика. Если нужно соорудить какие-нибудь протокольные заморочки, то стартовалка получается чуть сложнее, ну и подкачивать приходится уже функцией, но опять же с отрицательным кодом возврата по концу FIFO. Если нужно, например для строба передачи по RS485, то по обнаружению пустоты FIFO делается засечка времени, ну и обычно либо в службе таймеров, либо в той же фоновой задаче, где отправлялся блок, отрабатывается строб по TxComplete. И шедулер ни разу лишнего движения не делает. Просто сколько ни читаю, везде идея с разделяемым ресурсом и как это окучить. Мне аж приходилось делить внешнюю память, которая на SPI, но я не делил между задачами ресурс, а завел сервис, который делит полосу при неединственном клиенте в очереди. UARTы же обычно отдаются одному-трём драйверам протоколов и те просто по своей сути монополизируют доступ, но этим рулит спулер. А если при этом всём ещё хочется на каком-нибудь MSP430 поспать, то мучать лишний раз планировщик стОит конкретных денег за емкость батареи. И тогда скорее лучше выбрать/выделить бОльший объём ОЗУ на буфер, чем на лишнюю TCB…
0
тоже так раньше делал отправку. теперь сделал так. шедулер тоже особо не дёргается.
раньше у меня (наверно как и у вас) в каждой нити был буфер, откуда UART отправлялка брала данные. тоже, как вы пишете, был тупо счётчик уменьшаемый. Теперь — один буфер, кольцевой который.

В-общем, хз. Вроде не намного сложнее вышло, чем раньше. Явных преимуществ тоже пока не видно, за исключением одного: поток обязательно выведет свои данные. По-вашему, он может пропустить вывод, если ресурс занят. С другой стороны, можно написать что-то типа tx_wait которая ждёт (через планировщик), пока освободится передатчик.

PS. а что, на MSP430 спать вредно?
0
У меня чаще в ожидании машина состояний(обычно тупо содранная из мануала) драйвера протокола. Вопрос скорее в том, что в моих реалиях не было случая, когда ресурс делится между задачами на ходу. Даже между CSD-мостом и отправлялкой данных по GPRS спокойно находится сильнейший и просто даунит слабую задачу. Потому что реальная процедура отправки имеет такой временной масштаб, что проще принять волевое решение на этапе проектирования прикладной программы, чем в код прикладной задачи замешивать системную часть. Я за максимальное разделение. При моем зоопарке камней хошь-не-хошь, а ещё и о портабельности кода думать приходится.
ЗЫ. Вредно не спать, а жрать:)
0
Только зарегистрированные и авторизованные пользователи могут оставлять комментарии.