Это то, что в конечном итоге сработало для меня:
import org.apache.hadoop.fs._
import org.apache.spark.deploy.SparkHadoopUtil
import java.net.URI
val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfs_conf)
// source data in HDFS
val sourcePath = new Path("/<source_location>/<filename_pattern>")
hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
val filePathName = fileStatus.getPath().toString()
val fileName = fileStatus.getPath().getName()
// < DO STUFF HERE>
} // end foreach loop
Я посмотрел бы на использование некоторого типа Области транзакций / система Контекста. Таким образом, у Вас мог бы быть следующий код, который примерно основан на.Net & C#.
public class OrderService
{
public void CreateNewOrder(Order order, Customer customer)
{
//Set up our transactional boundary.
using (TransactionScope ts=new TransactionScope())
{
IOrderRepository orderRepos=GetOrderRespository();
orderRepos.SaveNew(order);
customer.Status=CustomerStatus.OrderPlaced;
ICustomerRepository customerRepository=GetCustomerRepository();
customerRepository.Save(customer)
ts.Commit();
}
}
}
TransactionScope может вложить так скажем, у Вас было действие, которое пересекло многочисленные услуги, Ваше приложение создаст TransactionScope также. Теперь в текущем .NET, если Вы используете TransactionScope, они сделали, чтобы Вы рискнули возрастать к DTC, но это будет разрешено в будущем.
Мы создали наш собственный класс TransactionScope, который в основном справился с нашими соединениями с БД и использовал локальные транзакции SQL.
Вы хотите посмотреть на реализацию шаблона единицы работы. Существуют реализации там для NHibernate. Каждый находится у Носорога проект палаты общин, существует также Машина. UoW.
Используя Spring.NET AOP + NHibernate можно записать класс репозитория как нормальный и настроить транзакции в пользовательском XML-файле:
public class CustomerService : ICustomerService
{
private readonly ICustomerRepository _customerRepository;
private readonly IOrderRepository _orderRepository;
public CustomerService(
ICustomerRepository customerRepository,
IOrderRepository orderRepository)
{
_customerRepository = customerRepository;
_orderRepository = orderRepository;
}
public int CreateOrder(Order o, Customer c)
{
// Do something with _customerRepository and _orderRepository
}
}
В XML-файле Вы выбираете, какие методы требуется быть выполненными в транзакции:
<object id="TxProxyConfigurationTemplate"
abstract="true"
type="Spring.Transaction.Interceptor.TransactionProxyFactoryObject, Spring.Data">
<property name="PlatformTransactionManager" ref="HibernateTransactionManager"/>
<property name="TransactionAttributes">
<name-values>
<add key="Create*" value="PROPAGATION_REQUIRED"/>
</name-values>
</property>
</object>
<object id="customerService" parent="TxProxyConfigurationTemplate">
<property name="Target">
<object type="MyNamespace.CustomerService, HibernateTest">
<constructor-arg name="customerRepository" ref="customerRepository" />
<constructor-arg name="orderRepository" ref="orderRepository" />
</object>
</property>
</object>
И в Вашем коде Вы получаете экземпляр класса CustomerService как это:
ICustomerService customerService = (ICustomerService)ContextRegistry
.GetContent()
.GetObject("customerService");
Spring.NET возвратит Вас прокси класса CustomerService, который применит транзакцию, когда Вы назовете метод CreateOrder. Таким образом, там не является никакая транзакция определенный код в Ваших классах обслуживания. AOP заботится о нем. Для получения дополнительной информации можно смотреть на документацию Spring.NET .
, Как я инкапсулирую сохранение больше чем одного объекта транзакционным способом с помощью шаблона репозитория? Например, что, если я хотел добавить порядок и обновить клиентское состояние на основе того создания порядка, но только сделать так если порядок, завершенный успешно? Следует иметь в виду, что для этого примера, заказы не являются набором в клиенте. Они - свой собственный объект.
не ответственность репозитория, обычно что-то сделанное в более высоком уровне. Хотя Вы сказали Ваш не заинтересованный определенными технологиями, я думаю, который стоит связать решения, например, при использовании NHibernate с веб-приложением, которого Вы, вероятно, рассмотрели бы использование сессия - на запрос .
Поэтому, если бы можно справиться с транзакциями в более высоком уровне затем, мои две опции были бы:
, Если Вы идете для второй опции затем, вопрос - то, что происходит с объектами в оперативной памяти, Вашего Клиента можно было бы оставить в непоследовательном состоянии. Если это будет иметь значение, и я работаю в сценариях, где это не делает, поскольку объект был только загружен в для того запроса, то я буду рассматривать первичную проверку, если его возможное, потому что это намного легче, чем альтернативы (откатывающий изменения в оперативной памяти или перезагружающий объекты).
Начальная загрузка моего компьютера этим утром, я столкнулся с точной проблемой для проекта, я продолжаю работать. У меня были некоторые идеи, которые приводят к следующему дизайну - и комментарии были бы более, чем потрясающими. К сожалению, дизайн, предложенный Josh, не возможен, поскольку я должен работать с удаленным SQL-сервером и не могу включить Распределять Координатору Транзакции сервис, на который он полагается.
Мое решение основано на нескольких все же простых изменениях в моем существующем коде.
Первый, у меня есть вся своя реализация репозиториев простой интерфейс маркера:
/// <summary>
/// A base interface for all repositories to implement.
/// </summary>
public interface IRepository
{ }
, Во-вторых, я позволяю включенным репозиториям всей своей транзакции, реализуют следующий интерфейс:
/// <summary>
/// Provides methods to enable transaction support.
/// </summary>
public interface IHasTransactions : IRepository
{
/// <summary>
/// Initiates a transaction scope.
/// </summary>
void BeginTransaction();
/// <summary>
/// Executes the transaction.
/// </summary>
void CommitTransaction();
}
идея состоит в том, что во всех моих репозиториях я реализую этот интерфейс и добавляю код, который представляет транзакцию непосредственно в зависимости от фактического поставщика (для поддельных репозиториев, я составил список делегатов, который выполняется на фиксации). Для LINQ к SQL было бы легко сделать реализации, такие как:
#region IHasTransactions Members
public void BeginTransaction()
{
_db.Transaction = _db.Connection.BeginTransaction();
}
public void CommitTransaction()
{
_db.Transaction.Commit();
}
#endregion
Это, конечно, требует, чтобы новый класс репозитория был создан для каждого потока, но это разумно для моего проекта.
Каждый метод с помощью репозитория должен вызвать BeginTransaction()
и EndTransaction()
, если репозиторий реализует IHasTransactions
. Для совершения этого еще легче вызова я придумал следующие расширения:
/// <summary>
/// Extensions for spawning and subsequently executing a transaction.
/// </summary>
public static class TransactionExtensions
{
/// <summary>
/// Begins a transaction if the repository implements <see cref="IHasTransactions"/>.
/// </summary>
/// <param name="repository"></param>
public static void BeginTransaction(this IRepository repository)
{
var transactionSupport = repository as IHasTransactions;
if (transactionSupport != null)
{
transactionSupport.BeginTransaction();
}
}
public static void CommitTransaction(this IRepository repository)
{
var transactionSupport = repository as IHasTransactions;
if (transactionSupport != null)
{
transactionSupport.CommitTransaction();
}
}
}
Комментарии ценятся!